From 7285a99504461ae81a09c23702ed2ae2443b8dbc Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Mon, 7 Oct 2024 14:33:15 +0200 Subject: [PATCH 1/5] fix node connected status flappging (#4587) ## Problem Statement Node connection status has been observed to flap between connected and disconnected states due to race conditions in the `HeartbeatServer` and its interaction with the priority queue for heartbeat management. ## Root Cause Analysis The issue stems from non-atomic operations in the existing heartbeat message [handler](https://github.com/bacalhau-project/bacalhau/blob/44b44efed9ce84043db9be7b9365e98113b2014a/pkg/node/heartbeat/server.go#L202) implementation: 1. Checking for an older heartbeat 2. Removing the old heartbeat 3. Enqueuing a new heartbeat This sequence of operations is vulnerable to race conditions when concurrent heartbeats arrive from the same node, potentially resulting in multiple heartbeats for a single node in the queue. This result in unexpected behaviour as the HashedPriorityQueue is expected to have a single item in the queue for the same key/node ## Why Now? While this bug has existed since version 1.4, it only became apparent with the introduction of concurrent heartbeats in version 1.5. The new version requires nodes to heartbeat to two topics: - The old topic (supported by 1.4 orchestrators) - A new topic (supported by 1.5 orchestrators) As a result, 1.5 orchestrators now receive two concurrent heartbeats from 1.5 compute nodes, exposing the race condition. ## Reproduction Steps 1. Set up a devstack environment with approximately 10 nodes 2. Observe the node connection status 3. Note the flapping between connected and disconnected states ## Solution Instead of simply locking the `HeartbeatServer.Handle()` method, I've implemented a more comprehensive fix to address the underlying issues: 1. Modified `HashedPriorityQueue` to enforce a single item per key atomically within the queue 2. Introduced a `Peek` method to allow `HeartbeatServer` to examine the oldest item without removal and without having to loop over all item using `DequeueWhere` 3. Corrected the priority and ordering of heartbeat events in the queue These changes eliminate the need for manual checks, dequeues, and re-enqueues, while also improving the overall efficiency of the queue operations. ## Implementation Details 1. `HashedPriorityQueue` Modifications: - Ensure atomic operations for maintaining a single item per key - Implement version tracking for items so that enqueues remain fast, while dequeues will lazily filter out and remove items that don't match the latest version for the same key 2. New `Peek` Method: - Allow examination of the oldest item without altering the queue state - Improve efficiency of `HeartbeatServer` operations without having to loop over all item using `DequeueWhere` 3. Heartbeat Event Prioritization: - Adjust priority calculation to ensure oldest events are dequeued first ## Testing Conducted - Enhanced test coverage for `HashedPriorityQueue` to ensure unique items per key - Improved concurrent heartbeat testing in `HeartbeatServer` - Manual testing using devstack environments --- pkg/lib/collections/hashed_priority_queue.go | 137 +++++++++++++--- .../collections/hashed_priority_queue_test.go | 135 ++++++++++++++-- pkg/lib/collections/priority_queue.go | 20 +++ .../collections/priority_queue_base_test.go | 153 ++++++++++++++++++ pkg/lib/collections/priority_queue_test.go | 112 ++++--------- pkg/node/heartbeat/heartbeat_test.go | 121 ++++++++++++++ pkg/node/heartbeat/server.go | 83 +++++----- 7 files changed, 597 insertions(+), 164 deletions(-) create mode 100644 pkg/lib/collections/priority_queue_base_test.go diff --git a/pkg/lib/collections/hashed_priority_queue.go b/pkg/lib/collections/hashed_priority_queue.go index 66f0b2d232..22ba6aeb86 100644 --- a/pkg/lib/collections/hashed_priority_queue.go +++ b/pkg/lib/collections/hashed_priority_queue.go @@ -1,14 +1,60 @@ package collections -import "sync" - +import ( + "sync" +) + +// HashedPriorityQueue is a priority queue that maintains only a single item per unique key. +// It combines the functionality of a hash map and a priority queue to provide efficient +// operations with the following key characteristics: +// +// 1. Single Item Per Key: The queue maintains only the latest version of an item for each +// unique key. When a new item with an existing key is enqueued, it replaces the old item +// instead of adding a duplicate. +// +// 2. Lazy Dequeuing: Outdated items (those that have been replaced by a newer version) are +// not immediately removed from the underlying queue. Instead, they are filtered out +// during dequeue operations. This approach improves enqueue performance +// at the cost of potentially slower dequeue operations. +// +// 3. Higher Enqueue Throughput: By avoiding immediate removal of outdated items, the +// HashedPriorityQueue achieves higher enqueue throughput. This makes it particularly +// suitable for scenarios with frequent updates to existing items. +// +// 4. Eventually Consistent: The queue becomes consistent over time as outdated items are +// lazily removed during dequeue operations. This means that the queue's length and the +// items it contains become accurate as items are dequeued. +// +// 5. Memory Consideration: Due to the lazy removal of outdated items, the underlying queue +// may temporarily hold more items than there are unique keys. This trade-off allows for +// better performance but may use more memory compared to a strictly consistent queue. +// +// Use HashedPriorityQueue when you need a priority queue that efficiently handles updates +// to existing items and can tolerate some latency in removing outdated entries in favor +// of higher enqueue performance. type HashedPriorityQueue[K comparable, T any] struct { - identifiers map[K]struct{} - queue *PriorityQueue[T] + identifiers map[K]int64 + queue *PriorityQueue[versionedItem[T]] mu sync.RWMutex indexer IndexerFunc[K, T] } +// versionedItem wraps the actual data item with a version number. +// This structure is used internally by HashedPriorityQueue to implement +// the versioning mechanism that allows for efficient updates and +// lazy removal of outdated items. The queue is only interested in +// the latest version of an item for each unique key: +// - data: The actual item of type T stored in the queue. +// - version: A monotonically increasing number representing the +// version of this item. When an item with the same key is enqueued, +// its version is incremented. This allows the queue to identify +// the most recent version during dequeue operations and discard +// any older versions of the same item. +type versionedItem[T any] struct { + data T + version int64 +} + // IndexerFunc is used to find the key (of type K) from the provided // item (T). This will be used for the item lookup in `Contains` type IndexerFunc[K comparable, T any] func(item T) K @@ -18,12 +64,24 @@ type IndexerFunc[K comparable, T any] func(item T) K // be used on Enqueue/Dequeue to keep the index up to date. func NewHashedPriorityQueue[K comparable, T any](indexer IndexerFunc[K, T]) *HashedPriorityQueue[K, T] { return &HashedPriorityQueue[K, T]{ - identifiers: make(map[K]struct{}), - queue: NewPriorityQueue[T](), + identifiers: make(map[K]int64), + queue: NewPriorityQueue[versionedItem[T]](), indexer: indexer, } } +// isLatestVersion checks if the given item is the latest version +func (q *HashedPriorityQueue[K, T]) isLatestVersion(item versionedItem[T]) bool { + k := q.indexer(item.data) + currentVersion := q.identifiers[k] + return item.version == currentVersion +} + +// unwrapQueueItem converts a versionedItem to a QueueItem +func (q *HashedPriorityQueue[K, T]) unwrapQueueItem(item *QueueItem[versionedItem[T]]) *QueueItem[T] { + return &QueueItem[T]{Value: item.Value.data, Priority: item.Priority} +} + // Contains will return true if the provided identifier (of type K) // will be found in this queue, false if it is not present. func (q *HashedPriorityQueue[K, T]) Contains(id K) bool { @@ -40,9 +98,9 @@ func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int64) { defer q.mu.Unlock() k := q.indexer(data) - - q.identifiers[k] = struct{}{} - q.queue.Enqueue(data, priority) + version := q.identifiers[k] + 1 + q.identifiers[k] = version + q.queue.Enqueue(versionedItem[T]{data: data, version: version}, priority) } // Dequeue returns the next highest priority item, returning both @@ -53,16 +111,39 @@ func (q *HashedPriorityQueue[K, T]) Dequeue() *QueueItem[T] { q.mu.Lock() defer q.mu.Unlock() - item := q.queue.Dequeue() - if item == nil { - return nil + for { + item := q.queue.Dequeue() + if item == nil { + return nil + } + + if q.isLatestVersion(item.Value) { + k := q.indexer(item.Value.data) + delete(q.identifiers, k) + return q.unwrapQueueItem(item) + } } +} + +// Peek returns the next highest priority item without removing it from the queue. +// It returns nil if the queue is empty. +func (q *HashedPriorityQueue[K, T]) Peek() *QueueItem[T] { + q.mu.RLock() + defer q.mu.RUnlock() - // Find the key for the item and delete it from the presence map - k := q.indexer(item.Value) - delete(q.identifiers, k) + for { + item := q.queue.Peek() + if item == nil { + return nil + } - return item + if q.isLatestVersion(item.Value) { + return q.unwrapQueueItem(item) + } + + // If the peeked item is outdated, remove it and continue + q.queue.Dequeue() + } } // DequeueWhere allows the caller to iterate through the queue, in priority order, and @@ -74,26 +155,32 @@ func (q *HashedPriorityQueue[K, T]) DequeueWhere(matcher MatchingFunction[T]) *Q q.mu.Lock() defer q.mu.Unlock() - item := q.queue.DequeueWhere(matcher) - if item == nil { - return nil - } + for { + item := q.queue.DequeueWhere(func(vi versionedItem[T]) bool { + return matcher(vi.data) + }) - k := q.indexer(item.Value) - delete(q.identifiers, k) + if item == nil { + return nil + } - return item + if q.isLatestVersion(item.Value) { + k := q.indexer(item.Value.data) + delete(q.identifiers, k) + return q.unwrapQueueItem(item) + } + } } // Len returns the number of items currently in the queue func (q *HashedPriorityQueue[K, T]) Len() int { - return q.queue.Len() + return len(q.identifiers) } // IsEmpty returns a boolean denoting whether the queue is // currently empty or not. func (q *HashedPriorityQueue[K, T]) IsEmpty() bool { - return q.queue.Len() == 0 + return q.Len() == 0 } var _ PriorityQueueInterface[struct{}] = (*HashedPriorityQueue[string, struct{}])(nil) diff --git a/pkg/lib/collections/hashed_priority_queue_test.go b/pkg/lib/collections/hashed_priority_queue_test.go index 04340b939a..f5838fbe99 100644 --- a/pkg/lib/collections/hashed_priority_queue_test.go +++ b/pkg/lib/collections/hashed_priority_queue_test.go @@ -5,12 +5,21 @@ package collections_test import ( "testing" - "github.com/bacalhau-project/bacalhau/pkg/lib/collections" "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/lib/collections" ) type HashedPriorityQueueSuite struct { - suite.Suite + PriorityQueueTestSuite +} + +func (s *HashedPriorityQueueSuite) SetupTest() { + s.NewQueue = func() collections.PriorityQueueInterface[TestData] { + return collections.NewHashedPriorityQueue[string, TestData](func(t TestData) string { + return t.id + }) + } } func TestHashedPriorityQueueSuite(t *testing.T) { @@ -18,20 +27,120 @@ func TestHashedPriorityQueueSuite(t *testing.T) { } func (s *HashedPriorityQueueSuite) TestContains() { - type TestData struct { - id string - data int - } - - indexer := func(t TestData) string { - return t.id - } - - q := collections.NewHashedPriorityQueue[string, TestData](indexer) + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) s.Require().False(q.Contains("A")) - q.Enqueue(TestData{id: "A", data: 0}, 1) + q.Enqueue(TestData{"A", 0}, 1) s.Require().True(q.Contains("A")) _ = q.Dequeue() s.Require().False(q.Contains("A")) } + +func (s *HashedPriorityQueueSuite) TestPeek() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"B", 2}, 2) + + item := q.Peek() + s.Require().NotNil(item) + s.Require().Equal(TestData{"B", 2}, item.Value) + s.Require().True(q.Contains("A"), "Item A should still be in the queue after Peek") + s.Require().True(q.Contains("B"), "Item B should still be in the queue after Peek") + + _ = q.Dequeue() + s.Require().False(q.Contains("B"), "Item B should not be in the queue after Dequeue") + s.Require().True(q.Contains("A"), "Item A should still be in the queue after Dequeue") +} + +func (s *HashedPriorityQueueSuite) TestSingleItemPerKey() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"A", 2}, 2) + q.Enqueue(TestData{"A", 3}, 3) + + s.Require().Equal(1, q.Len(), "Queue should only contain one item for key 'A'") + + item := q.Dequeue() + s.Require().NotNil(item) + s.Require().Equal(TestData{"A", 3}, item.Value, "Should return the latest version of item 'A'") + s.Require().Equal(int64(3), item.Priority, "Should have the priority of the latest enqueue") + + s.Require().Nil(q.Dequeue(), "Queue should be empty after dequeuing the single item") +} + +func (s *HashedPriorityQueueSuite) TestPeekReturnsLatestVersion() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"B", 1}, 3) + q.Enqueue(TestData{"A", 2}, 2) + + item := q.Peek() + s.Require().NotNil(item) + s.Require().Equal(TestData{"B", 1}, item.Value, "Peek should return 'B' as it has the highest priority") + s.Require().Equal(int64(3), item.Priority) + + q.Enqueue(TestData{"B", 2}, 1) // Lower priority, but newer version + + item = q.Peek() + s.Require().NotNil(item) + s.Require().Equal(TestData{"A", 2}, item.Value, "Peek should now return 'A' as 'B' has lower priority") + s.Require().Equal(int64(2), item.Priority) +} + +func (s *HashedPriorityQueueSuite) TestDequeueWhereReturnsLatestVersion() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"B", 1}, 2) + q.Enqueue(TestData{"A", 2}, 3) + + item := q.DequeueWhere(func(td TestData) bool { + return td.id == "A" + }) + + s.Require().NotNil(item) + s.Require().Equal(TestData{"A", 2}, item.Value, "DequeueWhere should return the latest version of 'A'") + s.Require().Equal(int64(3), item.Priority) + + s.Require().False(q.Contains("A"), "A should no longer be in the queue") + s.Require().True(q.Contains("B"), "B should still be in the queue") +} + +func (s *HashedPriorityQueueSuite) TestDuplicateKeys() { + inputs := []struct { + v TestData + p int64 + }{ + {TestData{"A", 1}, 3}, + {TestData{"B", 2}, 2}, + {TestData{"A", 3}, 1}, // Duplicate key with lower priority + {TestData{"C", 4}, 4}, + {TestData{"B", 5}, 5}, // Duplicate key with higher priority + } + + pq := s.NewQueue() + for _, tc := range inputs { + pq.Enqueue(tc.v, tc.p) + } + + expected := []struct { + v TestData + p int64 + }{ + {TestData{"B", 5}, 5}, + {TestData{"C", 4}, 4}, + {TestData{"A", 3}, 1}, + } + + for _, exp := range expected { + qitem := pq.Dequeue() + s.Require().NotNil(qitem) + s.Require().Equal(exp.v, qitem.Value) + s.Require().Equal(exp.p, qitem.Priority) + } + + s.Require().True(pq.IsEmpty()) +} diff --git a/pkg/lib/collections/priority_queue.go b/pkg/lib/collections/priority_queue.go index 72129a414e..8448111d3e 100644 --- a/pkg/lib/collections/priority_queue.go +++ b/pkg/lib/collections/priority_queue.go @@ -35,6 +35,10 @@ type PriorityQueueInterface[T any] interface { // extra PriorityQueue) for the dequeued items. DequeueWhere(matcher MatchingFunction[T]) *QueueItem[T] + // Peek returns the next highest priority item without removing it from the queue. + // It returns nil if the queue is empty. + Peek() *QueueItem[T] + // Len returns the number of items currently in the queue Len() int @@ -117,6 +121,22 @@ func (pq *PriorityQueue[T]) dequeue() *QueueItem[T] { return &QueueItem[T]{Value: item, Priority: heapItem.priority} } +// Peek returns the next highest priority item without removing it from the queue. +// It returns nil if the queue is empty. +func (pq *PriorityQueue[T]) Peek() *QueueItem[T] { + pq.mu.Lock() + defer pq.mu.Unlock() + + if pq.IsEmpty() { + return nil + } + + heapItem := pq.internalQueue[0] + item, _ := heapItem.value.(T) + + return &QueueItem[T]{Value: item, Priority: heapItem.priority} +} + // DequeueWhere allows the caller to iterate through the queue, in priority order, and // attempt to match an item using the provided `MatchingFunction`. This method has a high // time cost as dequeued but non-matching items must be held and requeued once the process diff --git a/pkg/lib/collections/priority_queue_base_test.go b/pkg/lib/collections/priority_queue_base_test.go new file mode 100644 index 0000000000..0a5db6f5cd --- /dev/null +++ b/pkg/lib/collections/priority_queue_base_test.go @@ -0,0 +1,153 @@ +//go:build unit || !integration + +package collections_test + +import ( + "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/lib/collections" +) + +type TestData struct { + id string + data int +} + +type PriorityQueueTestSuite struct { + suite.Suite + NewQueue func() collections.PriorityQueueInterface[TestData] +} + +func (s *PriorityQueueTestSuite) TestSimple() { + type testcase struct { + v TestData + p int64 + } + inputs := []testcase{ + {TestData{"B", 2}, 2}, {TestData{"A", 1}, 3}, {TestData{"C", 3}, 1}, + } + expected := []testcase{ + {TestData{"A", 1}, 3}, {TestData{"B", 2}, 2}, {TestData{"C", 3}, 1}, + } + + pq := s.NewQueue() + for _, tc := range inputs { + pq.Enqueue(tc.v, tc.p) + } + + for _, tc := range expected { + qitem := pq.Dequeue() + s.Require().NotNil(qitem) + s.Require().Equal(tc.v, qitem.Value) + s.Require().Equal(tc.p, qitem.Priority) + } + + s.Require().True(pq.IsEmpty()) +} + +func (s *PriorityQueueTestSuite) TestSimpleMin() { + type testcase struct { + v TestData + p int64 + } + inputs := []testcase{ + {TestData{"B", 2}, -2}, {TestData{"A", 1}, -3}, {TestData{"C", 3}, -1}, + } + expected := []testcase{ + {TestData{"C", 3}, -1}, {TestData{"B", 2}, -2}, {TestData{"A", 1}, -3}, + } + + pq := s.NewQueue() + for _, tc := range inputs { + pq.Enqueue(tc.v, tc.p) + } + + for _, tc := range expected { + qitem := pq.Dequeue() + s.Require().NotNil(qitem) + s.Require().Equal(tc.v, qitem.Value) + s.Require().Equal(tc.p, qitem.Priority) + } + + s.Require().True(pq.IsEmpty()) +} + +func (s *PriorityQueueTestSuite) TestEmpty() { + pq := s.NewQueue() + qitem := pq.Dequeue() + s.Require().Nil(qitem) + s.Require().True(pq.IsEmpty()) +} + +func (s *PriorityQueueTestSuite) TestDequeueWhere() { + pq := s.NewQueue() + pq.Enqueue(TestData{"A", 1}, 4) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"B", 2}, 3) + pq.Enqueue(TestData{"C", 3}, 2) + + count := pq.Len() + + qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + return possibleMatch.id == "B" + }) + + s.Require().NotNil(qitem) + s.Require().Equal(TestData{"B", 2}, qitem.Value) + s.Require().Equal(int64(3), qitem.Priority) + s.Require().Equal(count-1, pq.Len()) +} + +func (s *PriorityQueueTestSuite) TestDequeueWhereFail() { + pq := s.NewQueue() + pq.Enqueue(TestData{"A", 1}, 4) + + qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + return possibleMatch.id == "Z" + }) + + s.Require().Nil(qitem) +} + +func (s *PriorityQueueTestSuite) TestPeek() { + pq := s.NewQueue() + + // Test 1: Peek on an empty queue + item := pq.Peek() + s.Require().Nil(item, "Peek on an empty queue should return nil") + + // Test 2: Peek after adding one item + pq.Enqueue(TestData{"A", 1}, 1) + item = pq.Peek() + s.Require().NotNil(item, "Peek should return an item") + s.Require().Equal(TestData{"A", 1}, item.Value, "Peek should return the correct value") + s.Require().Equal(int64(1), item.Priority, "Peek should return the correct priority") + s.Require().Equal(1, pq.Len(), "Peek should not remove the item from the queue") + + // Test 3: Peek with multiple items + pq.Enqueue(TestData{"B", 2}, 3) + pq.Enqueue(TestData{"C", 3}, 2) + item = pq.Peek() + s.Require().NotNil(item, "Peek should return an item") + s.Require().Equal(TestData{"B", 2}, item.Value, "Peek should return the highest priority item") + s.Require().Equal(int64(3), item.Priority, "Peek should return the correct priority") + s.Require().Equal(3, pq.Len(), "Peek should not remove any items from the queue") + + // Test 4: Peek after dequeue + dequeuedItem := pq.Dequeue() + s.Require().Equal(TestData{"B", 2}, dequeuedItem.Value, "Dequeue should return the highest priority item") + item = pq.Peek() + s.Require().NotNil(item, "Peek should return an item") + s.Require().Equal(TestData{"C", 3}, item.Value, "Peek should return the new highest priority item after dequeue") + s.Require().Equal(int64(2), item.Priority, "Peek should return the correct priority") + s.Require().Equal(2, pq.Len(), "Queue length should be reduced after dequeue") + + // Test 5: Multiple peeks should return the same item + item1 := pq.Peek() + item2 := pq.Peek() + s.Require().Equal(item1, item2, "Multiple peeks should return the same item") + s.Require().Equal(2, pq.Len(), "Multiple peeks should not change the queue length") +} diff --git a/pkg/lib/collections/priority_queue_test.go b/pkg/lib/collections/priority_queue_test.go index 9a2d115ab8..ec3adeb23d 100644 --- a/pkg/lib/collections/priority_queue_test.go +++ b/pkg/lib/collections/priority_queue_test.go @@ -5,109 +5,59 @@ package collections_test import ( "testing" - "github.com/bacalhau-project/bacalhau/pkg/lib/collections" "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/lib/collections" ) type PriorityQueueSuite struct { - suite.Suite + PriorityQueueTestSuite +} + +func (s *PriorityQueueSuite) SetupTest() { + s.NewQueue = func() collections.PriorityQueueInterface[TestData] { + return collections.NewPriorityQueue[TestData]() + } } func TestPriorityQueueSuite(t *testing.T) { suite.Run(t, new(PriorityQueueSuite)) } -func (s *PriorityQueueSuite) TestSimple() { - type testcase struct { - v string +func (s *PriorityQueueSuite) TestDuplicateKeys() { + inputs := []struct { + v TestData p int64 - } - inputs := []testcase{ - {"B", 2}, {"A", 3}, {"C", 1}, {"A", 3}, {"C", 1}, {"B", 2}, - } - expected := []testcase{ - {"A", 3}, {"A", 3}, {"B", 2}, {"B", 2}, {"C", 1}, {"C", 1}, + }{ + {TestData{"A", 1}, 3}, + {TestData{"B", 2}, 2}, + {TestData{"A", 3}, 1}, // Duplicate key with lower priority + {TestData{"C", 4}, 4}, + {TestData{"B", 5}, 5}, // Duplicate key with higher priority } - pq := collections.NewPriorityQueue[string]() + pq := s.NewQueue() for _, tc := range inputs { - pq.Enqueue(tc.v, int64(tc.p)) + pq.Enqueue(tc.v, tc.p) } - for _, tc := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) - } - - s.Require().True(pq.IsEmpty()) -} - -func (s *PriorityQueueSuite) TestSimpleMin() { - type testcase struct { - v string + expected := []struct { + v TestData p int64 - } - inputs := []testcase{ - {"B", -2}, {"A", -3}, {"C", -1}, {"A", -3}, {"C", -1}, {"B", -2}, - } - expected := []testcase{ - {"C", -1}, {"C", -1}, {"B", -2}, {"B", -2}, {"A", -3}, {"A", -3}, + }{ + {TestData{"B", 5}, 5}, + {TestData{"C", 4}, 4}, + {TestData{"A", 1}, 3}, + {TestData{"B", 2}, 2}, + {TestData{"A", 3}, 1}, } - pq := collections.NewPriorityQueue[string]() - for _, tc := range inputs { - pq.Enqueue(tc.v, int64(tc.p)) - } - - for _, tc := range expected { + for _, exp := range expected { qitem := pq.Dequeue() s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) + s.Require().Equal(exp.v, qitem.Value) + s.Require().Equal(exp.p, qitem.Priority) } s.Require().True(pq.IsEmpty()) } - -func (s *PriorityQueueSuite) TestEmpty() { - pq := collections.NewPriorityQueue[string]() - qitem := pq.Dequeue() - s.Require().Nil(qitem) - s.Require().True(pq.IsEmpty()) -} - -func (s *PriorityQueueSuite) TestDequeueWhere() { - pq := collections.NewPriorityQueue[string]() - pq.Enqueue("A", 4) - pq.Enqueue("D", 1) - pq.Enqueue("D", 1) - pq.Enqueue("D", 1) - pq.Enqueue("D", 1) - pq.Enqueue("B", 3) - pq.Enqueue("C", 2) - - count := pq.Len() - - qitem := pq.DequeueWhere(func(possibleMatch string) bool { - return possibleMatch == "B" - }) - - s.Require().NotNil(qitem) - s.Require().Equal("B", qitem.Value) - s.Require().Equal(int64(3), qitem.Priority) - s.Require().Equal(count-1, pq.Len()) - -} - -func (s *PriorityQueueSuite) TestDequeueWhereFail() { - pq := collections.NewPriorityQueue[string]() - pq.Enqueue("A", 4) - - qitem := pq.DequeueWhere(func(possibleMatch string) bool { - return possibleMatch == "Z" - }) - - s.Require().Nil(qitem) -} diff --git a/pkg/node/heartbeat/heartbeat_test.go b/pkg/node/heartbeat/heartbeat_test.go index e9b5743620..a1a814cf66 100644 --- a/pkg/node/heartbeat/heartbeat_test.go +++ b/pkg/node/heartbeat/heartbeat_test.go @@ -6,12 +6,14 @@ import ( "context" "fmt" "strconv" + "sync" "testing" "time" "github.com/benbjohnson/clock" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/bacalhau-project/bacalhau/pkg/lib/ncl" @@ -226,3 +228,122 @@ func (s *HeartbeatTestSuite) TestSendHeartbeatError() { err = client.SendHeartbeat(ctx, 1) s.Error(err) } + +func (s *HeartbeatTestSuite) TestConcurrentHeartbeats() { + ctx := context.Background() + numNodes := 10 + numHeartbeatsPerNode := 100 + + var wg sync.WaitGroup + wg.Add(numNodes) + + for i := 0; i < numNodes; i++ { + go func(nodeID string) { + defer wg.Done() + client, err := NewClient(s.natsConn, nodeID, s.publisher) + require.NoError(s.T(), err) + + for j := 0; j < numHeartbeatsPerNode; j++ { + s.Require().NoError(client.SendHeartbeat(ctx, uint64(j))) + time.Sleep(time.Millisecond) // Small delay to simulate real-world scenario + } + }(fmt.Sprintf("node-%d", i)) + } + + wg.Wait() + + // Allow time for all heartbeats to be processed + time.Sleep(100 * time.Millisecond) + + // Verify that all nodes are marked as HEALTHY + for i := 0; i < numNodes; i++ { + nodeID := fmt.Sprintf("node-%d", i) + nodeState := &models.NodeState{Info: models.NodeInfo{NodeID: nodeID}} + s.heartbeatServer.UpdateNodeInfo(nodeState) + s.Require().Equal(models.NodeStates.HEALTHY, nodeState.Connection) + } +} + +func (s *HeartbeatTestSuite) TestConcurrentHeartbeatsWithDisconnection() { + ctx := context.Background() + numNodes := 5 + numHeartbeatsPerNode := 50 + + var wg sync.WaitGroup + wg.Add(numNodes) + + for i := 0; i < numNodes; i++ { + go func(nodeID string) { + defer wg.Done() + client, err := NewClient(s.natsConn, nodeID, s.publisher) + require.NoError(s.T(), err) + + for j := 0; j < numHeartbeatsPerNode; j++ { + s.Require().NoError(client.SendHeartbeat(ctx, uint64(j))) + time.Sleep(time.Millisecond) + + if j == numHeartbeatsPerNode/2 { + // Simulate a disconnection by advancing the clock + s.clock.Add(10 * time.Second) + } + } + }(fmt.Sprintf("node-%d", i)) + } + + wg.Wait() + + // Allow time for all heartbeats to be processed + time.Sleep(100 * time.Millisecond) + + // Verify node states + for i := 0; i < numNodes; i++ { + nodeID := fmt.Sprintf("node-%d", i) + nodeState := &models.NodeState{Info: models.NodeInfo{NodeID: nodeID}} + s.heartbeatServer.UpdateNodeInfo(nodeState) + + // The exact state might vary depending on timing, but it should be either HEALTHY or DISCONNECTED + s.Require().Contains([]models.NodeConnectionState{models.NodeStates.HEALTHY, models.NodeStates.DISCONNECTED}, nodeState.Connection) + } +} + +func (s *HeartbeatTestSuite) TestConcurrentHeartbeatsAndChecks() { + ctx := context.Background() + numNodes := 5 + numHeartbeatsPerNode := 30 + checkInterval := 50 * time.Millisecond + + var wg sync.WaitGroup + wg.Add(numNodes + 1) // +1 for the checker goroutine + + // Start the checker goroutine + go func() { + defer wg.Done() + for i := 0; i < numHeartbeatsPerNode; i++ { + s.heartbeatServer.checkQueue(ctx) + time.Sleep(checkInterval) + } + }() + + for i := 0; i < numNodes; i++ { + go func(nodeID string) { + defer wg.Done() + client, err := NewClient(s.natsConn, nodeID, s.publisher) + require.NoError(s.T(), err) + + for j := 0; j < numHeartbeatsPerNode; j++ { + s.Require().NoError(client.SendHeartbeat(ctx, uint64(j))) + time.Sleep(checkInterval / 2) // Send heartbeats faster than checks + } + }(fmt.Sprintf("node-%d", i)) + } + + wg.Wait() + + // Verify final node states + for i := 0; i < numNodes; i++ { + nodeID := fmt.Sprintf("node-%d", i) + nodeState := &models.NodeState{Info: models.NodeInfo{NodeID: nodeID}} + s.heartbeatServer.UpdateNodeInfo(nodeState) + s.Require().Equal(models.NodeStates.HEALTHY, nodeState.Connection) + } +} diff --git a/pkg/node/heartbeat/server.go b/pkg/node/heartbeat/server.go index 5d77751ff3..ef3d901f69 100644 --- a/pkg/node/heartbeat/server.go +++ b/pkg/node/heartbeat/server.go @@ -113,7 +113,7 @@ func (h *HeartbeatServer) Start(ctx context.Context) error { case <-ctx.Done(): return case <-ticker.C: - h.CheckQueue(ctx) + h.checkQueue(ctx) } } }(ctx) @@ -125,34 +125,49 @@ func (h *HeartbeatServer) Start(ctx context.Context) error { return nil } -// CheckQueue will check the queue for old heartbeats that might make a node's +// checkQueue will check the queue for old heartbeats that might make a node's // liveness either unhealthy or unknown, and will update the node's status accordingly. -func (h *HeartbeatServer) CheckQueue(ctx context.Context) { - // These are the timestamps, below which we'll consider the item in one of those two - // states - nowStamp := h.clock.Now().UTC().Unix() - disconnectedUnder := nowStamp - int64(h.disconnectedAfter.Seconds()) +// This method is not thread-safe and should be called from a single goroutine. +func (h *HeartbeatServer) checkQueue(ctx context.Context) { + // Calculate the timestamp threshold for considering a node as disconnected + disconnectedUnder := h.clock.Now().Add(-h.disconnectedAfter).UTC().Unix() for { - // Dequeue anything older than the unknown timestamp - item := h.pqueue.DequeueWhere(func(item TimestampedHeartbeat) bool { - return item.Timestamp < disconnectedUnder - }) - - // We haven't found anything old enough yet. We can stop the loop and wait - // for the next cycle. - if item == nil { + // Peek at the next (oldest) item in the queue + peek := h.pqueue.Peek() + + // If the queue is empty, we're done + if peek == nil { + break + } + + // If the oldest item is recent enough, we're done + log.Ctx(ctx).Trace(). + Dur("LastHeartbeatAge", h.clock.Now().Sub(time.Unix(peek.Value.Timestamp, 0))). + Msgf("Peeked at %+v", peek) + if peek.Value.Timestamp >= disconnectedUnder { break } + // Dequeue the item and mark the node as disconnected + item := h.pqueue.Dequeue() + if item == nil || item.Value.Timestamp >= disconnectedUnder { + // This should never happen, but we'll check just in case + log.Warn().Msgf("Unexpected item dequeued: %+v didn't match previously peeked item: %+v", item, peek) + continue + } + if item.Value.NodeID == h.nodeID { // We don't want to mark ourselves as disconnected continue } - if item.Value.Timestamp < disconnectedUnder { - h.markNodeAs(item.Value.NodeID, models.NodeStates.DISCONNECTED) - } + log.Ctx(ctx).Debug(). + Str("NodeID", item.Value.NodeID). + Int64("LastHeartbeat", item.Value.Timestamp). + Dur("LastHeartbeatAge", h.clock.Now().Sub(time.Unix(item.Value.Timestamp, 0))). + Msg("Marking node as disconnected") + h.markNodeAs(item.Value.NodeID, models.NodeStates.DISCONNECTED) } } @@ -200,36 +215,14 @@ func (h *HeartbeatServer) ShouldProcess(ctx context.Context, message *ncl.Messag // Handle will handle a message received through the legacy heartbeat topic func (h *HeartbeatServer) Handle(ctx context.Context, heartbeat Heartbeat) error { - log.Ctx(ctx).Trace().Msgf("heartbeat received from %s", heartbeat.NodeID) - timestamp := h.clock.Now().UTC().Unix() + th := TimestampedHeartbeat{Heartbeat: heartbeat, Timestamp: timestamp} + log.Ctx(ctx).Trace().Msgf("Enqueueing heartbeat from %s with seq %d. %+v", th.NodeID, th.Sequence, th) - if h.pqueue.Contains(heartbeat.NodeID) { - // If we think we already have a heartbeat from this node, we'll update the - // timestamp of the entry so it is re-prioritized in the queue by dequeuing - // and re-enqueuing it (this will ensure it is heapified correctly). - result := h.pqueue.DequeueWhere(func(item TimestampedHeartbeat) bool { - return item.NodeID == heartbeat.NodeID - }) - - if result == nil { - log.Ctx(ctx).Warn().Msgf("consistency error in heartbeat heap, node %s not found", heartbeat.NodeID) - return nil - } - - log.Ctx(ctx).Trace().Msgf("Re-enqueueing heartbeat from %s", heartbeat.NodeID) - result.Value.Timestamp = timestamp - h.pqueue.Enqueue(result.Value, timestamp) - } else { - log.Ctx(ctx).Trace().Msgf("Enqueueing heartbeat from %s", heartbeat.NodeID) - - // We'll enqueue the heartbeat message with the current timestamp. The older - // the entry, the lower the timestamp (trending to 0) and the higher the priority. - h.pqueue.Enqueue(TimestampedHeartbeat{Heartbeat: heartbeat, Timestamp: timestamp}, timestamp) - } - + // We'll enqueue the heartbeat message with the current timestamp in reverse priority so that + // older heartbeats are dequeued first. + h.pqueue.Enqueue(th, -timestamp) h.markNodeAs(heartbeat.NodeID, models.NodeStates.HEALTHY) - return nil } From f9d5c971d381a26e8f9393ee7f04de4016cba13c Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Mon, 7 Oct 2024 16:01:16 +0200 Subject: [PATCH 2/5] set logging dest to be stdout, and simplify cmd logging (#4588) This PR creates a simplified logging mode for command line outputs and makes it the default mode, and only apply configurable log mode with `serve` command. Also sets the logging destination as `stdout` instead of `stderr` --- cmd/cli/config/list.go | 2 +- cmd/cli/devstack/devstack.go | 5 +- cmd/cli/root.go | 6 +- cmd/cli/serve/serve.go | 5 +- cmd/util/repo.go | 7 ++ pkg/logger/logger.go | 121 +++++++++++++++++++---------------- pkg/logger/logger_test.go | 66 ++++++++++++++++++- pkg/setup/setup.go | 4 -- 8 files changed, 145 insertions(+), 71 deletions(-) diff --git a/cmd/cli/config/list.go b/cmd/cli/config/list.go index c3d970b132..905f443811 100644 --- a/cmd/cli/config/list.go +++ b/cmd/cli/config/list.go @@ -41,7 +41,7 @@ Each key shown can be used with: if err != nil { return err } - log.Info().Msgf("Config loaded from: %s, and with data-dir %s", cfg.Paths(), cfg.Get(types.DataDirKey)) + log.Debug().Msgf("Config loaded from: %s, and with data-dir %s", cfg.Paths(), cfg.Get(types.DataDirKey)) return list(cmd, cfg, o) }, } diff --git a/cmd/cli/devstack/devstack.go b/cmd/cli/devstack/devstack.go index 8eb9102566..80fb3ed4d2 100644 --- a/cmd/cli/devstack/devstack.go +++ b/cmd/cli/devstack/devstack.go @@ -6,6 +6,7 @@ import ( "path/filepath" "strconv" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/viper" "k8s.io/kubectl/pkg/util/i18n" @@ -103,9 +104,7 @@ func NewCmd() *cobra.Command { RunE: func(cmd *cobra.Command, _ []string) error { // TODO: a hack to force debug logging for devstack // until I figure out why flags and env vars are not working - if err := logger.ConfigureLogging(string(logger.LogModeDefault), "debug"); err != nil { - return fmt.Errorf("failed to configure logging: %w", err) - } + logger.ConfigureLogging(logger.LogModeDefault, zerolog.DebugLevel) return runDevstack(cmd, ODs) }, } diff --git a/cmd/cli/root.go b/cmd/cli/root.go index 9677911d71..8df28fa5ee 100644 --- a/cmd/cli/root.go +++ b/cmd/cli/root.go @@ -76,15 +76,11 @@ func NewRootCmd() *cobra.Command { // While we allow users to configure logging via the config file, they are applied // and will override this configuration at a later stage when the config is loaded. // This is needed to ensure any logs before the config is loaded are captured. - logMode := viper.GetString(types.LoggingModeKey) - if logMode == "" { - logMode = string(logger.LogModeDefault) - } logLevel := viper.GetString(types.LoggingLevelKey) if logLevel == "" { logLevel = "Info" } - if err := logger.ConfigureLogging(logMode, logLevel); err != nil { + if err := logger.ParseAndConfigureLogging(string(logger.LogModeCmd), logLevel); err != nil { return fmt.Errorf("failed to configure logging: %w", err) } diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index 2ca436e133..8886b58ae4 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -95,6 +95,10 @@ func NewCmd() *cobra.Command { return fmt.Errorf("failed to setup config: %w", err) } + if err = logger.ParseAndConfigureLogging(cfg.Logging.Mode, cfg.Logging.Level); err != nil { + return fmt.Errorf("failed to configure logging: %w", err) + } + log.Info().Msgf("Config loaded from: %s, and with data-dir %s", rawCfg.Paths(), rawCfg.Get(types.DataDirKey)) @@ -103,7 +107,6 @@ func NewCmd() *cobra.Command { if err != nil { return fmt.Errorf("failed to reconcile repo: %w", err) } - return serve(cmd, cfg, fsr) }, } diff --git a/cmd/util/repo.go b/cmd/util/repo.go index 042041a056..a295d53661 100644 --- a/cmd/util/repo.go +++ b/cmd/util/repo.go @@ -12,6 +12,7 @@ import ( "github.com/bacalhau-project/bacalhau/cmd/util/hook" "github.com/bacalhau-project/bacalhau/pkg/config" "github.com/bacalhau-project/bacalhau/pkg/config/types" + "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/repo" "github.com/bacalhau-project/bacalhau/pkg/setup" ) @@ -75,6 +76,12 @@ func SetupConfigType(cmd *cobra.Command) (*config.Config, error) { if err != nil { return nil, err } + + // We always apply the configured logging level. Logging mode on the other hand is only applied with serve cmd + if err = logger.ParseAndConfigureLoggingLevel(cfg.Get(types.LoggingLevelKey).(string)); err != nil { + return nil, fmt.Errorf("failed to configure logging: %w", err) + } + return cfg, nil } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index c053cd9b94..dc34a033c0 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -13,6 +13,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "time" "github.com/rs/zerolog/pkgerrors" @@ -29,22 +30,31 @@ type LogMode string // Available logging modes const ( - LogModeDefault LogMode = "default" - LogModeStation LogMode = "station" - LogModeJSON LogMode = "json" - LogModeCombined LogMode = "combined" - LogModeEvent LogMode = "event" + LogModeDefault LogMode = "default" + LogModeJSON LogMode = "json" + LogModeCmd LogMode = "cmd" +) + +var ( + logMu sync.Mutex ) func ParseLogMode(s string) (LogMode, error) { - lm := []LogMode{LogModeDefault, LogModeStation, LogModeJSON, LogModeCombined, LogModeEvent} + lm := []LogMode{LogModeDefault, LogModeJSON, LogModeCmd} for _, logMode := range lm { if strings.ToLower(s) == strings.ToLower(string(logMode)) { return logMode, nil } } - return "Error", fmt.Errorf("%q is an invalid log-mode (valid modes: %q)", - s, lm) + return "", fmt.Errorf("%q is an invalid log-mode (valid modes: %q)", s, lm) +} + +func ParseLogLevel(s string) (zerolog.Level, error) { + l, err := zerolog.ParseLevel(s) + if err != nil { + return l, fmt.Errorf("%q is an invalid log-level", s) + } + return l, nil } var nodeIDFieldName = "NodeID" @@ -57,12 +67,14 @@ func init() { //nolint:gochecknoinits strings.HasSuffix(os.Args[0], ".test") || flag.Lookup("test.v") != nil || flag.Lookup("test.run") != nil { - configureLogging(zerolog.DebugLevel, defaultLogging()) + ConfigureLoggingLevel(zerolog.DebugLevel) + configureLogging(defaultLogging()) return } // the default log level when not running a test is ERROR - configureLogging(zerolog.ErrorLevel, bufferLogs()) + ConfigureLoggingLevel(zerolog.ErrorLevel) + configureLogging(bufferLogs()) } func ErrOrDebug(err error) zerolog.Level { @@ -82,46 +94,66 @@ type tTesting interface { func ConfigureTestLogging(t tTesting) { oldLogger := log.Logger oldContextLogger := zerolog.DefaultContextLogger - configureLogging(zerolog.DebugLevel, zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t), defaultLogFormat)) + ConfigureLoggingLevel(zerolog.DebugLevel) + configureLogging(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t), defaultLogFormat)) t.Cleanup(func() { log.Logger = oldLogger zerolog.DefaultContextLogger = oldContextLogger }) } -func ConfigureLogging(modeStr, levelStr string) error { - logModeConfig := defaultLogging() - +func ParseAndConfigureLogging(modeStr, levelStr string) error { mode, err := ParseLogMode(modeStr) if err != nil { - return fmt.Errorf("invalid log mode: %w", err) + return err } - level, err := zerolog.ParseLevel(levelStr) + level, err := ParseLogLevel(levelStr) if err != nil { - return fmt.Errorf("invalid log level: %w", err) + return err } + ConfigureLogging(mode, level) + return nil +} + +func ConfigureLogging(mode LogMode, level zerolog.Level) { + var logWriter io.Writer switch mode { case LogModeDefault: - logModeConfig = defaultLogging() - case LogModeStation: - logModeConfig = defaultStationLogging() + logWriter = defaultLogging() case LogModeJSON: - logModeConfig = jsonLogging() - case LogModeEvent: - logModeConfig = eventLogging() - case LogModeCombined: - logModeConfig = combinedLogging() + logWriter = jsonLogging() + case LogModeCmd: + logWriter = clientLogging() + default: + logWriter = defaultLogging() } - configureLogging(level, logModeConfig) - LogBufferedLogs(logModeConfig) + ConfigureLoggingLevel(level) + configureLogging(logWriter) + LogBufferedLogs(logWriter) +} + +func ParseAndConfigureLoggingLevel(level string) error { + l, err := ParseLogLevel(level) + if err != nil { + return err + } + ConfigureLoggingLevel(l) return nil } -func configureLogging(level zerolog.Level, logWriter io.Writer) { - zerolog.TimeFieldFormat = time.RFC3339Nano +func ConfigureLoggingLevel(level zerolog.Level) { + logMu.Lock() + defer logMu.Unlock() zerolog.SetGlobalLevel(level) +} + +func configureLogging(logWriter io.Writer) { + logMu.Lock() + defer logMu.Unlock() + + zerolog.TimeFieldFormat = time.RFC3339Nano info, ok := debug.ReadBuildInfo() if ok && info.Main.Path != "" { @@ -148,21 +180,13 @@ func jsonLogging() io.Writer { return os.Stdout } -func eventLogging() io.Writer { - return io.Discard -} - -func combinedLogging() io.Writer { - return zerolog.MultiLevelWriter(defaultLogging(), os.Stdout) -} - func defaultLogging() io.Writer { return zerolog.NewConsoleWriter(defaultLogFormat) } func defaultLogFormat(w *zerolog.ConsoleWriter) { isTerminal := isatty.IsTerminal(os.Stdout.Fd()) - w.Out = os.Stderr + w.Out = os.Stdout w.NoColor = !isTerminal w.TimeFormat = "15:04:05.999 |" w.PartsOrder = []string{ @@ -187,25 +211,10 @@ func defaultLogFormat(w *zerolog.ConsoleWriter) { } } -func defaultStationLogging() io.Writer { +func clientLogging() io.Writer { return zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { - isTerminal := isatty.IsTerminal(os.Stdout.Fd()) - w.Out = os.Stdout - w.NoColor = !isTerminal - w.PartsOrder = []string{ - zerolog.LevelFieldName, - zerolog.MessageFieldName, - } - - w.FormatLevel = func(i interface{}) string { - return strings.ToUpper(i.(string)) + ":" - } - w.FormatErrFieldName = func(i interface{}) string { - return "- " - } - w.FormatErrFieldValue = func(i interface{}) string { - return strings.Trim(i.(string), "\"") - } + defaultLogFormat(w) + w.PartsOrder = []string{zerolog.MessageFieldName} }) } diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index ac25c6536d..7786c4b979 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -23,7 +23,8 @@ func TestConfigureLogging(t *testing.T) { }) var logging strings.Builder - configureLogging(zerolog.InfoLevel, zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { + ConfigureLoggingLevel(zerolog.InfoLevel) + configureLogging(zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { defaultLogFormat(w) w.Out = &logging w.NoColor = true @@ -40,3 +41,66 @@ func TestConfigureLogging(t *testing.T) { assert.Contains(t, actual, "pkg/logger/testpackage/subpackage/subsubpackage/testutil.go", "Log statement doesn't contain the full package path") assert.Contains(t, actual, `stack:[{"func":"TestLog","line":`, "Log statement didn't automatically include the error's stacktrace") } + +func TestParseAndConfigureLogging(t *testing.T) { + err := ParseAndConfigureLogging("default", "debug") + assert.NoError(t, err) + assert.Equal(t, zerolog.DebugLevel, zerolog.GlobalLevel()) + + err = ParseAndConfigureLogging("json", "info") + assert.NoError(t, err) + assert.Equal(t, zerolog.InfoLevel, zerolog.GlobalLevel()) + + err = ParseAndConfigureLogging("invalid", "error") + assert.Error(t, err) + + err = ParseAndConfigureLogging("default", "invalid") + assert.Error(t, err) +} + +func TestParseLogMode(t *testing.T) { + tests := []struct { + input string + expected LogMode + hasError bool + }{ + {"default", LogModeDefault, false}, + {"json", LogModeJSON, false}, + {"cmd", LogModeCmd, false}, + {"invalid", "", true}, + } + + for _, test := range tests { + result, err := ParseLogMode(test.input) + if test.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, result) + } + } +} + +func TestParseLogLevel(t *testing.T) { + tests := []struct { + input string + expected zerolog.Level + hasError bool + }{ + {"debug", zerolog.DebugLevel, false}, + {"info", zerolog.InfoLevel, false}, + {"warn", zerolog.WarnLevel, false}, + {"error", zerolog.ErrorLevel, false}, + {"invalid", zerolog.NoLevel, true}, + } + + for _, test := range tests { + result, err := ParseLogLevel(test.input) + if test.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, result) + } + } +} diff --git a/pkg/setup/setup.go b/pkg/setup/setup.go index 345268831e..d270600bec 100644 --- a/pkg/setup/setup.go +++ b/pkg/setup/setup.go @@ -9,7 +9,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config" "github.com/bacalhau-project/bacalhau/pkg/config/types" - "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/repo/migrations" "github.com/bacalhau-project/bacalhau/pkg/repo" @@ -25,9 +24,6 @@ func SetupMigrationManager() (*repo.MigrationManager, error) { // SetupBacalhauRepo ensures that a bacalhau repo and config exist and are initialized. func SetupBacalhauRepo(cfg types.Bacalhau) (*repo.FsRepo, error) { - if err := logger.ConfigureLogging(cfg.Logging.Mode, cfg.Logging.Level); err != nil { - return nil, fmt.Errorf("failed to configure logging: %w", err) - } migrationManger, err := SetupMigrationManager() if err != nil { return nil, fmt.Errorf("failed to create migration manager: %w", err) From 83130ed03cc1c715f577a5fae834da92f6d22de3 Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Mon, 7 Oct 2024 18:14:50 +0200 Subject: [PATCH 3/5] Release.1.5 (#4594) --- ops/terraform/dev.tfvars | 4 ++-- ops/terraform/main.tf | 7 +++---- ops/terraform/prod.tfvars | 4 ++-- .../remote_files/scripts/start-bacalhau.sh | 18 ++++++++++-------- ops/terraform/stage.tfvars | 4 ++-- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/ops/terraform/dev.tfvars b/ops/terraform/dev.tfvars index d1dbfa7e0b..cf966dcb6a 100644 --- a/ops/terraform/dev.tfvars +++ b/ops/terraform/dev.tfvars @@ -1,8 +1,8 @@ -bacalhau_version = "v1.4.0" +bacalhau_version = "v1.5.0" bacalhau_branch = "" bacalhau_port = "1235" bacalhau_environment = "development" -ipfs_version = "v0.12.2" +ipfs_version = "v0.18.1" gcp_project = "bacalhau-dev" grafana_cloud_prometheus_user = "14299" grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push" diff --git a/ops/terraform/main.tf b/ops/terraform/main.tf index ec6c73d682..da5d9fa6d5 100644 --- a/ops/terraform/main.tf +++ b/ops/terraform/main.tf @@ -67,11 +67,10 @@ export GRAFANA_CLOUD_TEMPO_ENDPOINT="${var.grafana_cloud_tempo_endpoint}" export OTEL_COLLECTOR_VERSION="${var.otel_collector_version}" export OTEL_EXPORTER_OTLP_ENDPOINT="${var.otel_collector_endpoint}" export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=${terraform.workspace}" -export BACALHAU_NODE_NETWORK_ORCHESTRATORS="${var.internal_ip_addresses[0]}:4222" -export BACALHAU_NODE_NETWORK_ADVERTISEDADDRESS="${var.public_ip_addresses[count.index]}:4222" -export BACALHAU_NODE_NETWORK_CLUSTER_PEERS="" +export BACALHAU_ORCHESTRATORS="${var.internal_ip_addresses[0]}:4222" +export BACALHAU_ORCHESTRATOR_ADVERTISE="${var.public_ip_addresses[count.index]}:4222" export BACALHAU_LOCAL_PUBLISHER_ADDRESS="${var.public_ip_addresses[count.index]}" - +export BACALHAU_WEBUI_BACKEND="http://${var.public_ip_addresses[0]}:1234" ### secrets are installed in the install-node.sh script export SECRETS_GRAFANA_CLOUD_PROMETHEUS_API_KEY="${var.grafana_cloud_prometheus_api_key}" diff --git a/ops/terraform/prod.tfvars b/ops/terraform/prod.tfvars index f123692123..c626227559 100644 --- a/ops/terraform/prod.tfvars +++ b/ops/terraform/prod.tfvars @@ -1,7 +1,7 @@ -bacalhau_version = "v1.4.0" +bacalhau_version = "v1.5.0" bacalhau_port = "1235" bacalhau_environment = "production" -ipfs_version = "v0.12.2" +ipfs_version = "v0.18.1" gcp_project = "bacalhau-prod" grafana_cloud_prometheus_user = "1008771" grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push" diff --git a/ops/terraform/remote_files/scripts/start-bacalhau.sh b/ops/terraform/remote_files/scripts/start-bacalhau.sh index 318faefd82..252fbb148c 100644 --- a/ops/terraform/remote_files/scripts/start-bacalhau.sh +++ b/ops/terraform/remote_files/scripts/start-bacalhau.sh @@ -36,12 +36,14 @@ bacalhau serve \ --job-execution-timeout-bypass-client-id="${TRUSTED_CLIENT_IDS}" \ --ipfs-connect /ip4/127.0.0.1/tcp/5001 \ --api-port 1234 \ - --web-ui="${BACALHAU_NODE_WEBUI}" \ - --web-ui-port 80 \ - --labels owner=bacalhau \ --requester-job-translation-enabled \ - --config Job.Defaults.Batch.Task.Publisher.Type=local \ - --config Job.Defaults.Ops.Task.Publisher.Type=local \ - --config Job.Defaults.Service.Task.Publisher.Type=local \ - --config Job.Defaults.Daemon.Task.Publisher.Type=local \ - --local-publisher-address "${BACALHAU_LOCAL_PUBLISHER_ADDRESS}" + --config DisableAnalytics \ + --config labels="owner=bacalhau,name=node-${TERRAFORM_NODE_INDEX}"\ + --config Compute.Orchestrators="${BACALHAU_ORCHESTRATORS}" \ + --config Orchestrator.Advertise="${BACALHAU_ORCHESTRATOR_ADVERTISE}" \ + --config WebUI.Enabled="${BACALHAU_NODE_WEBUI}" \ + --config WebUI.Listen=0.0.0.0:80 \ + --config WebUI.Backend="${BACALHAU_WEBUI_BACKEND}" \ + --config JobDefaults.Batch.Task.Publisher.Type=local \ + --config JobDefaults.Ops.Task.Publisher.Type=local \ + --config Publishers.Types.Local.Address="${BACALHAU_LOCAL_PUBLISHER_ADDRESS}" diff --git a/ops/terraform/stage.tfvars b/ops/terraform/stage.tfvars index 544b263869..628f78e0ee 100644 --- a/ops/terraform/stage.tfvars +++ b/ops/terraform/stage.tfvars @@ -1,8 +1,8 @@ -bacalhau_version = "v1.5.0-dev2" +bacalhau_version = "v1.5.0" bacalhau_branch = "" # deploy from a branch instead of the version above bacalhau_port = "1235" bacalhau_environment = "staging" -ipfs_version = "v0.12.2" +ipfs_version = "v0.18.1" gcp_project = "bacalhau-stage" grafana_cloud_prometheus_user = "1008771" grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push" From 11545a9aa9499b34d338058e22896eaa0c5b1769 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 8 Oct 2024 11:11:33 +0200 Subject: [PATCH 4/5] Bump next from 14.2.7 to 14.2.10 in /webui (#4453) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bumps [next](https://github.com/vercel/next.js) from 14.2.7 to 14.2.10.
Release notes

Sourced from next's releases.

v14.2.10

[!NOTE]
This release is backporting bug fixes. It does not include all pending features/changes on canary.

Core Changes

Credits

Huge thanks to @​huozhi and @​ijjk for helping!

v14.2.9

[!NOTE]
This release is backporting bug fixes. It does not include all pending features/changes on canary.

Core Changes

  • Revert "Fix esm property def in flight loader (#66990)" (#69749)
  • Disable experimental.optimizeServer by default to fix failed server action (#69788)
  • Fix middleware fallback: false case (#69799)
  • Fix status code for /_not-found route (#64058) (#69808)
  • Fix metadata prop merging (#69807)
  • create-next-app: fix font file corruption when using import alias (#69806)

Credits

Huge thanks to @​huozhi, @​ztanner, @​ijjk, and @​lubieowoce for helping!

v14.2.8

What's Changed

[!NOTE]
This release is backporting bug fixes and minor improvements. It does not include all pending features/changes on canary.

Support esmExternals in app directory

  • Support esm externals in app router (#65041)
  • Turbopack: Allow client components from foreign code in app routes (#64751)
  • Turbopack: add support for esm externals in app dir (#64918)
  • other related PRs: #66990 #66727 #66286 #65519

Reading cookies set in middleware in components and actions

  • initialize ALS with cookies in middleware (#65008)
  • fix middleware cookie initialization (#65820)
  • ensure cookies set in middleware can be read in a server action (#67924)
  • fix: merged middleware cookies should preserve options (#67956)

... (truncated)

Commits

[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=next&package-manager=npm_and_yarn&previous-version=14.2.7&new-version=14.2.10)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) ---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/bacalhau-project/bacalhau/network/alerts).
Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Walid Baruni --- webui/package.json | 2 +- webui/yarn.lock | 94 +++++++++++++++++++++++----------------------- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/webui/package.json b/webui/package.json index 6f4ad63331..114b626a3a 100644 --- a/webui/package.json +++ b/webui/package.json @@ -27,7 +27,7 @@ "class-variance-authority": "^0.7.0", "clsx": "^2.1.1", "lucide-react": "^0.438.0", - "next": "14.2.7", + "next": "14.2.10", "next-themes": "^0.3.0", "react": "^18", "react-dom": "^18", diff --git a/webui/yarn.lock b/webui/yarn.lock index f645055fda..bbbe05d0bc 100644 --- a/webui/yarn.lock +++ b/webui/yarn.lock @@ -223,10 +223,10 @@ __metadata: languageName: node linkType: hard -"@next/env@npm:14.2.7": - version: 14.2.7 - resolution: "@next/env@npm:14.2.7" - checksum: 10c0/1cda023007acda4d47036a25fba0e039d9b2df9c3770651dc289207e0537506675546c02b5b574fe92bb1adc1c887d948d5cb630673aa572754278b82d150b7e +"@next/env@npm:14.2.10": + version: 14.2.10 + resolution: "@next/env@npm:14.2.10" + checksum: 10c0/e13ad3bb18f576a62a011f08393bd20cf025c3e3aa378d9df5c10f53b63a6eacd43b47aee00ffc8b2eb7988e4af58a1e1504a8e115aad753b35f3ee1cdbbc37a languageName: node linkType: hard @@ -239,65 +239,65 @@ __metadata: languageName: node linkType: hard -"@next/swc-darwin-arm64@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-darwin-arm64@npm:14.2.7" +"@next/swc-darwin-arm64@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-darwin-arm64@npm:14.2.10" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@next/swc-darwin-x64@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-darwin-x64@npm:14.2.7" +"@next/swc-darwin-x64@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-darwin-x64@npm:14.2.10" conditions: os=darwin & cpu=x64 languageName: node linkType: hard -"@next/swc-linux-arm64-gnu@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-arm64-gnu@npm:14.2.7" +"@next/swc-linux-arm64-gnu@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-arm64-gnu@npm:14.2.10" conditions: os=linux & cpu=arm64 & libc=glibc languageName: node linkType: hard -"@next/swc-linux-arm64-musl@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-arm64-musl@npm:14.2.7" +"@next/swc-linux-arm64-musl@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-arm64-musl@npm:14.2.10" conditions: os=linux & cpu=arm64 & libc=musl languageName: node linkType: hard -"@next/swc-linux-x64-gnu@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-x64-gnu@npm:14.2.7" +"@next/swc-linux-x64-gnu@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-x64-gnu@npm:14.2.10" conditions: os=linux & cpu=x64 & libc=glibc languageName: node linkType: hard -"@next/swc-linux-x64-musl@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-x64-musl@npm:14.2.7" +"@next/swc-linux-x64-musl@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-x64-musl@npm:14.2.10" conditions: os=linux & cpu=x64 & libc=musl languageName: node linkType: hard -"@next/swc-win32-arm64-msvc@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-win32-arm64-msvc@npm:14.2.7" +"@next/swc-win32-arm64-msvc@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-win32-arm64-msvc@npm:14.2.10" conditions: os=win32 & cpu=arm64 languageName: node linkType: hard -"@next/swc-win32-ia32-msvc@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-win32-ia32-msvc@npm:14.2.7" +"@next/swc-win32-ia32-msvc@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-win32-ia32-msvc@npm:14.2.10" conditions: os=win32 & cpu=ia32 languageName: node linkType: hard -"@next/swc-win32-x64-msvc@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-win32-x64-msvc@npm:14.2.7" +"@next/swc-win32-x64-msvc@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-win32-x64-msvc@npm:14.2.10" conditions: os=win32 & cpu=x64 languageName: node linkType: hard @@ -4071,20 +4071,20 @@ __metadata: languageName: node linkType: hard -"next@npm:14.2.7": - version: 14.2.7 - resolution: "next@npm:14.2.7" - dependencies: - "@next/env": "npm:14.2.7" - "@next/swc-darwin-arm64": "npm:14.2.7" - "@next/swc-darwin-x64": "npm:14.2.7" - "@next/swc-linux-arm64-gnu": "npm:14.2.7" - "@next/swc-linux-arm64-musl": "npm:14.2.7" - "@next/swc-linux-x64-gnu": "npm:14.2.7" - "@next/swc-linux-x64-musl": "npm:14.2.7" - "@next/swc-win32-arm64-msvc": "npm:14.2.7" - "@next/swc-win32-ia32-msvc": "npm:14.2.7" - "@next/swc-win32-x64-msvc": "npm:14.2.7" +"next@npm:14.2.10": + version: 14.2.10 + resolution: "next@npm:14.2.10" + dependencies: + "@next/env": "npm:14.2.10" + "@next/swc-darwin-arm64": "npm:14.2.10" + "@next/swc-darwin-x64": "npm:14.2.10" + "@next/swc-linux-arm64-gnu": "npm:14.2.10" + "@next/swc-linux-arm64-musl": "npm:14.2.10" + "@next/swc-linux-x64-gnu": "npm:14.2.10" + "@next/swc-linux-x64-musl": "npm:14.2.10" + "@next/swc-win32-arm64-msvc": "npm:14.2.10" + "@next/swc-win32-ia32-msvc": "npm:14.2.10" + "@next/swc-win32-x64-msvc": "npm:14.2.10" "@swc/helpers": "npm:0.5.5" busboy: "npm:1.6.0" caniuse-lite: "npm:^1.0.30001579" @@ -4125,7 +4125,7 @@ __metadata: optional: true bin: next: dist/bin/next - checksum: 10c0/661ff5196f671d68ece76f3003d049848163cb21a946fe71673225e56f82726f647a1869085f010869a778e56dd601b9065221031823562aeb92026677f8a5fd + checksum: 10c0/a64991d44db2b6dcb3e7b780f14fe138fa97052767cf96a7733d1de4a857834e19a31fd5a742cca14201ad800bf03924d613e3d4e99932a1112bb9a03662f95a languageName: node linkType: hard @@ -5737,7 +5737,7 @@ __metadata: eslint-config-prettier: "npm:^9.1.0" eslint-plugin-prettier: "npm:^5.2.1" lucide-react: "npm:^0.438.0" - next: "npm:14.2.7" + next: "npm:14.2.10" next-themes: "npm:^0.3.0" openapi-typescript-codegen: "npm:^0.29.0" postcss: "npm:^8" From 764831dcbfe2f8d811b8dcc67363394960359427 Mon Sep 17 00:00:00 2001 From: Jamil Shamy <4977827+jamlo@users.noreply.github.com> Date: Tue, 8 Oct 2024 10:51:16 -0400 Subject: [PATCH 5/5] Improve API error handling (#4607) This PR tries to fix the "FromHttpResponse" function. Returning 2 errors from a function can be confusing and uncommon in Golang, especially if the function only returns these 2 errors. The new code will return one error, which is always an APIError, and it also improves the error messaging wording, adding more details in the output. Several more tweaks can be done to the current error message handling if needed. Note: this is identical to [PR 4565](https://github.com/bacalhau-project/bacalhau/pull/4565), though created from a branch, and not a fork. --- pkg/publicapi/apimodels/error.go | 19 ++++++++++++------- pkg/publicapi/client/v2/client.go | 27 ++++++++------------------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 6d2bc98bfd..842fe5734e 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -2,7 +2,6 @@ package apimodels import ( "encoding/json" - "errors" "fmt" "io" "net/http" @@ -67,23 +66,29 @@ func (e *APIError) Error() string { } // Parse HTTP Resposne to APIError -func FromHttpResponse(resp *http.Response) (*APIError, error) { - +func GenerateAPIErrorFromHTTPResponse(resp *http.Response) *APIError { if resp == nil { - return nil, errors.New("response is nil, cannot be unmarsheld to APIError") + return NewAPIError(0, "API call error, invalid response") } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("error reading response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to read API call response body. Error: %q", err.Error())) } var apiErr APIError err = json.Unmarshal(body, &apiErr) if err != nil { - return nil, fmt.Errorf("error parsing response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to parse API call response body. Error: %q. Body received: %q", + err.Error(), + string(body), + )) } // If the JSON didn't include a status code, use the HTTP Status @@ -91,7 +96,7 @@ func FromHttpResponse(resp *http.Response) (*APIError, error) { apiErr.HTTPStatusCode = resp.StatusCode } - return &apiErr, nil + return &apiErr } // FromBacError converts a bacerror.Error to an APIError diff --git a/pkg/publicapi/client/v2/client.go b/pkg/publicapi/client/v2/client.go index 18f687ed34..95979028be 100644 --- a/pkg/publicapi/client/v2/client.go +++ b/pkg/publicapi/client/v2/client.go @@ -74,18 +74,12 @@ func (c *httpClient) Get(ctx context.Context, endpoint string, in apimodels.GetR return apimodels.NewUnauthorizedError("invalid token") } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - defer resp.Body.Close() if out != nil { @@ -116,18 +110,12 @@ func (c *httpClient) write(ctx context.Context, verb, endpoint string, in apimod return apimodels.ErrInvalidToken } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - if out != nil { if err := decodeBody(resp, &out); err != nil { return err @@ -362,12 +350,13 @@ func (c *httpClient) interceptError(ctx context.Context, err error, resp *http.R WithCode(bacerrors.UnauthorizedError) } - apiError, apiErr := apimodels.FromHttpResponse(resp) - if apiErr == nil { + apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp) + if apiError != nil { return apiError.ToBacError() } - return bacerrors.Wrap(apiErr, "server error"). + return bacerrors.New("server error"). + WithHTTPStatusCode(http.StatusInternalServerError). WithCode(bacerrors.InternalError) }