diff --git a/cmd/cli/config/list.go b/cmd/cli/config/list.go index c3d970b132..905f443811 100644 --- a/cmd/cli/config/list.go +++ b/cmd/cli/config/list.go @@ -41,7 +41,7 @@ Each key shown can be used with: if err != nil { return err } - log.Info().Msgf("Config loaded from: %s, and with data-dir %s", cfg.Paths(), cfg.Get(types.DataDirKey)) + log.Debug().Msgf("Config loaded from: %s, and with data-dir %s", cfg.Paths(), cfg.Get(types.DataDirKey)) return list(cmd, cfg, o) }, } diff --git a/cmd/cli/devstack/devstack.go b/cmd/cli/devstack/devstack.go index 8eb9102566..80fb3ed4d2 100644 --- a/cmd/cli/devstack/devstack.go +++ b/cmd/cli/devstack/devstack.go @@ -6,6 +6,7 @@ import ( "path/filepath" "strconv" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/viper" "k8s.io/kubectl/pkg/util/i18n" @@ -103,9 +104,7 @@ func NewCmd() *cobra.Command { RunE: func(cmd *cobra.Command, _ []string) error { // TODO: a hack to force debug logging for devstack // until I figure out why flags and env vars are not working - if err := logger.ConfigureLogging(string(logger.LogModeDefault), "debug"); err != nil { - return fmt.Errorf("failed to configure logging: %w", err) - } + logger.ConfigureLogging(logger.LogModeDefault, zerolog.DebugLevel) return runDevstack(cmd, ODs) }, } diff --git a/cmd/cli/root.go b/cmd/cli/root.go index 9677911d71..8df28fa5ee 100644 --- a/cmd/cli/root.go +++ b/cmd/cli/root.go @@ -76,15 +76,11 @@ func NewRootCmd() *cobra.Command { // While we allow users to configure logging via the config file, they are applied // and will override this configuration at a later stage when the config is loaded. // This is needed to ensure any logs before the config is loaded are captured. - logMode := viper.GetString(types.LoggingModeKey) - if logMode == "" { - logMode = string(logger.LogModeDefault) - } logLevel := viper.GetString(types.LoggingLevelKey) if logLevel == "" { logLevel = "Info" } - if err := logger.ConfigureLogging(logMode, logLevel); err != nil { + if err := logger.ParseAndConfigureLogging(string(logger.LogModeCmd), logLevel); err != nil { return fmt.Errorf("failed to configure logging: %w", err) } diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index 2ca436e133..8886b58ae4 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -95,6 +95,10 @@ func NewCmd() *cobra.Command { return fmt.Errorf("failed to setup config: %w", err) } + if err = logger.ParseAndConfigureLogging(cfg.Logging.Mode, cfg.Logging.Level); err != nil { + return fmt.Errorf("failed to configure logging: %w", err) + } + log.Info().Msgf("Config loaded from: %s, and with data-dir %s", rawCfg.Paths(), rawCfg.Get(types.DataDirKey)) @@ -103,7 +107,6 @@ func NewCmd() *cobra.Command { if err != nil { return fmt.Errorf("failed to reconcile repo: %w", err) } - return serve(cmd, cfg, fsr) }, } diff --git a/cmd/util/repo.go b/cmd/util/repo.go index 042041a056..a295d53661 100644 --- a/cmd/util/repo.go +++ b/cmd/util/repo.go @@ -12,6 +12,7 @@ import ( "github.com/bacalhau-project/bacalhau/cmd/util/hook" "github.com/bacalhau-project/bacalhau/pkg/config" "github.com/bacalhau-project/bacalhau/pkg/config/types" + "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/repo" "github.com/bacalhau-project/bacalhau/pkg/setup" ) @@ -75,6 +76,12 @@ func SetupConfigType(cmd *cobra.Command) (*config.Config, error) { if err != nil { return nil, err } + + // We always apply the configured logging level. Logging mode on the other hand is only applied with serve cmd + if err = logger.ParseAndConfigureLoggingLevel(cfg.Get(types.LoggingLevelKey).(string)); err != nil { + return nil, fmt.Errorf("failed to configure logging: %w", err) + } + return cfg, nil } diff --git a/ops/terraform/dev.tfvars b/ops/terraform/dev.tfvars index d1dbfa7e0b..cf966dcb6a 100644 --- a/ops/terraform/dev.tfvars +++ b/ops/terraform/dev.tfvars @@ -1,8 +1,8 @@ -bacalhau_version = "v1.4.0" +bacalhau_version = "v1.5.0" bacalhau_branch = "" bacalhau_port = "1235" bacalhau_environment = "development" -ipfs_version = "v0.12.2" +ipfs_version = "v0.18.1" gcp_project = "bacalhau-dev" grafana_cloud_prometheus_user = "14299" grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push" diff --git a/ops/terraform/main.tf b/ops/terraform/main.tf index ec6c73d682..da5d9fa6d5 100644 --- a/ops/terraform/main.tf +++ b/ops/terraform/main.tf @@ -67,11 +67,10 @@ export GRAFANA_CLOUD_TEMPO_ENDPOINT="${var.grafana_cloud_tempo_endpoint}" export OTEL_COLLECTOR_VERSION="${var.otel_collector_version}" export OTEL_EXPORTER_OTLP_ENDPOINT="${var.otel_collector_endpoint}" export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=${terraform.workspace}" -export BACALHAU_NODE_NETWORK_ORCHESTRATORS="${var.internal_ip_addresses[0]}:4222" -export BACALHAU_NODE_NETWORK_ADVERTISEDADDRESS="${var.public_ip_addresses[count.index]}:4222" -export BACALHAU_NODE_NETWORK_CLUSTER_PEERS="" +export BACALHAU_ORCHESTRATORS="${var.internal_ip_addresses[0]}:4222" +export BACALHAU_ORCHESTRATOR_ADVERTISE="${var.public_ip_addresses[count.index]}:4222" export BACALHAU_LOCAL_PUBLISHER_ADDRESS="${var.public_ip_addresses[count.index]}" - +export BACALHAU_WEBUI_BACKEND="http://${var.public_ip_addresses[0]}:1234" ### secrets are installed in the install-node.sh script export SECRETS_GRAFANA_CLOUD_PROMETHEUS_API_KEY="${var.grafana_cloud_prometheus_api_key}" diff --git a/ops/terraform/prod.tfvars b/ops/terraform/prod.tfvars index f123692123..c626227559 100644 --- a/ops/terraform/prod.tfvars +++ b/ops/terraform/prod.tfvars @@ -1,7 +1,7 @@ -bacalhau_version = "v1.4.0" +bacalhau_version = "v1.5.0" bacalhau_port = "1235" bacalhau_environment = "production" -ipfs_version = "v0.12.2" +ipfs_version = "v0.18.1" gcp_project = "bacalhau-prod" grafana_cloud_prometheus_user = "1008771" grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push" diff --git a/ops/terraform/remote_files/scripts/start-bacalhau.sh b/ops/terraform/remote_files/scripts/start-bacalhau.sh index 318faefd82..252fbb148c 100644 --- a/ops/terraform/remote_files/scripts/start-bacalhau.sh +++ b/ops/terraform/remote_files/scripts/start-bacalhau.sh @@ -36,12 +36,14 @@ bacalhau serve \ --job-execution-timeout-bypass-client-id="${TRUSTED_CLIENT_IDS}" \ --ipfs-connect /ip4/127.0.0.1/tcp/5001 \ --api-port 1234 \ - --web-ui="${BACALHAU_NODE_WEBUI}" \ - --web-ui-port 80 \ - --labels owner=bacalhau \ --requester-job-translation-enabled \ - --config Job.Defaults.Batch.Task.Publisher.Type=local \ - --config Job.Defaults.Ops.Task.Publisher.Type=local \ - --config Job.Defaults.Service.Task.Publisher.Type=local \ - --config Job.Defaults.Daemon.Task.Publisher.Type=local \ - --local-publisher-address "${BACALHAU_LOCAL_PUBLISHER_ADDRESS}" + --config DisableAnalytics \ + --config labels="owner=bacalhau,name=node-${TERRAFORM_NODE_INDEX}"\ + --config Compute.Orchestrators="${BACALHAU_ORCHESTRATORS}" \ + --config Orchestrator.Advertise="${BACALHAU_ORCHESTRATOR_ADVERTISE}" \ + --config WebUI.Enabled="${BACALHAU_NODE_WEBUI}" \ + --config WebUI.Listen=0.0.0.0:80 \ + --config WebUI.Backend="${BACALHAU_WEBUI_BACKEND}" \ + --config JobDefaults.Batch.Task.Publisher.Type=local \ + --config JobDefaults.Ops.Task.Publisher.Type=local \ + --config Publishers.Types.Local.Address="${BACALHAU_LOCAL_PUBLISHER_ADDRESS}" diff --git a/ops/terraform/stage.tfvars b/ops/terraform/stage.tfvars index 544b263869..628f78e0ee 100644 --- a/ops/terraform/stage.tfvars +++ b/ops/terraform/stage.tfvars @@ -1,8 +1,8 @@ -bacalhau_version = "v1.5.0-dev2" +bacalhau_version = "v1.5.0" bacalhau_branch = "" # deploy from a branch instead of the version above bacalhau_port = "1235" bacalhau_environment = "staging" -ipfs_version = "v0.12.2" +ipfs_version = "v0.18.1" gcp_project = "bacalhau-stage" grafana_cloud_prometheus_user = "1008771" grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push" diff --git a/pkg/lib/collections/hashed_priority_queue.go b/pkg/lib/collections/hashed_priority_queue.go index 66f0b2d232..22ba6aeb86 100644 --- a/pkg/lib/collections/hashed_priority_queue.go +++ b/pkg/lib/collections/hashed_priority_queue.go @@ -1,14 +1,60 @@ package collections -import "sync" - +import ( + "sync" +) + +// HashedPriorityQueue is a priority queue that maintains only a single item per unique key. +// It combines the functionality of a hash map and a priority queue to provide efficient +// operations with the following key characteristics: +// +// 1. Single Item Per Key: The queue maintains only the latest version of an item for each +// unique key. When a new item with an existing key is enqueued, it replaces the old item +// instead of adding a duplicate. +// +// 2. Lazy Dequeuing: Outdated items (those that have been replaced by a newer version) are +// not immediately removed from the underlying queue. Instead, they are filtered out +// during dequeue operations. This approach improves enqueue performance +// at the cost of potentially slower dequeue operations. +// +// 3. Higher Enqueue Throughput: By avoiding immediate removal of outdated items, the +// HashedPriorityQueue achieves higher enqueue throughput. This makes it particularly +// suitable for scenarios with frequent updates to existing items. +// +// 4. Eventually Consistent: The queue becomes consistent over time as outdated items are +// lazily removed during dequeue operations. This means that the queue's length and the +// items it contains become accurate as items are dequeued. +// +// 5. Memory Consideration: Due to the lazy removal of outdated items, the underlying queue +// may temporarily hold more items than there are unique keys. This trade-off allows for +// better performance but may use more memory compared to a strictly consistent queue. +// +// Use HashedPriorityQueue when you need a priority queue that efficiently handles updates +// to existing items and can tolerate some latency in removing outdated entries in favor +// of higher enqueue performance. type HashedPriorityQueue[K comparable, T any] struct { - identifiers map[K]struct{} - queue *PriorityQueue[T] + identifiers map[K]int64 + queue *PriorityQueue[versionedItem[T]] mu sync.RWMutex indexer IndexerFunc[K, T] } +// versionedItem wraps the actual data item with a version number. +// This structure is used internally by HashedPriorityQueue to implement +// the versioning mechanism that allows for efficient updates and +// lazy removal of outdated items. The queue is only interested in +// the latest version of an item for each unique key: +// - data: The actual item of type T stored in the queue. +// - version: A monotonically increasing number representing the +// version of this item. When an item with the same key is enqueued, +// its version is incremented. This allows the queue to identify +// the most recent version during dequeue operations and discard +// any older versions of the same item. +type versionedItem[T any] struct { + data T + version int64 +} + // IndexerFunc is used to find the key (of type K) from the provided // item (T). This will be used for the item lookup in `Contains` type IndexerFunc[K comparable, T any] func(item T) K @@ -18,12 +64,24 @@ type IndexerFunc[K comparable, T any] func(item T) K // be used on Enqueue/Dequeue to keep the index up to date. func NewHashedPriorityQueue[K comparable, T any](indexer IndexerFunc[K, T]) *HashedPriorityQueue[K, T] { return &HashedPriorityQueue[K, T]{ - identifiers: make(map[K]struct{}), - queue: NewPriorityQueue[T](), + identifiers: make(map[K]int64), + queue: NewPriorityQueue[versionedItem[T]](), indexer: indexer, } } +// isLatestVersion checks if the given item is the latest version +func (q *HashedPriorityQueue[K, T]) isLatestVersion(item versionedItem[T]) bool { + k := q.indexer(item.data) + currentVersion := q.identifiers[k] + return item.version == currentVersion +} + +// unwrapQueueItem converts a versionedItem to a QueueItem +func (q *HashedPriorityQueue[K, T]) unwrapQueueItem(item *QueueItem[versionedItem[T]]) *QueueItem[T] { + return &QueueItem[T]{Value: item.Value.data, Priority: item.Priority} +} + // Contains will return true if the provided identifier (of type K) // will be found in this queue, false if it is not present. func (q *HashedPriorityQueue[K, T]) Contains(id K) bool { @@ -40,9 +98,9 @@ func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int64) { defer q.mu.Unlock() k := q.indexer(data) - - q.identifiers[k] = struct{}{} - q.queue.Enqueue(data, priority) + version := q.identifiers[k] + 1 + q.identifiers[k] = version + q.queue.Enqueue(versionedItem[T]{data: data, version: version}, priority) } // Dequeue returns the next highest priority item, returning both @@ -53,16 +111,39 @@ func (q *HashedPriorityQueue[K, T]) Dequeue() *QueueItem[T] { q.mu.Lock() defer q.mu.Unlock() - item := q.queue.Dequeue() - if item == nil { - return nil + for { + item := q.queue.Dequeue() + if item == nil { + return nil + } + + if q.isLatestVersion(item.Value) { + k := q.indexer(item.Value.data) + delete(q.identifiers, k) + return q.unwrapQueueItem(item) + } } +} + +// Peek returns the next highest priority item without removing it from the queue. +// It returns nil if the queue is empty. +func (q *HashedPriorityQueue[K, T]) Peek() *QueueItem[T] { + q.mu.RLock() + defer q.mu.RUnlock() - // Find the key for the item and delete it from the presence map - k := q.indexer(item.Value) - delete(q.identifiers, k) + for { + item := q.queue.Peek() + if item == nil { + return nil + } - return item + if q.isLatestVersion(item.Value) { + return q.unwrapQueueItem(item) + } + + // If the peeked item is outdated, remove it and continue + q.queue.Dequeue() + } } // DequeueWhere allows the caller to iterate through the queue, in priority order, and @@ -74,26 +155,32 @@ func (q *HashedPriorityQueue[K, T]) DequeueWhere(matcher MatchingFunction[T]) *Q q.mu.Lock() defer q.mu.Unlock() - item := q.queue.DequeueWhere(matcher) - if item == nil { - return nil - } + for { + item := q.queue.DequeueWhere(func(vi versionedItem[T]) bool { + return matcher(vi.data) + }) - k := q.indexer(item.Value) - delete(q.identifiers, k) + if item == nil { + return nil + } - return item + if q.isLatestVersion(item.Value) { + k := q.indexer(item.Value.data) + delete(q.identifiers, k) + return q.unwrapQueueItem(item) + } + } } // Len returns the number of items currently in the queue func (q *HashedPriorityQueue[K, T]) Len() int { - return q.queue.Len() + return len(q.identifiers) } // IsEmpty returns a boolean denoting whether the queue is // currently empty or not. func (q *HashedPriorityQueue[K, T]) IsEmpty() bool { - return q.queue.Len() == 0 + return q.Len() == 0 } var _ PriorityQueueInterface[struct{}] = (*HashedPriorityQueue[string, struct{}])(nil) diff --git a/pkg/lib/collections/hashed_priority_queue_test.go b/pkg/lib/collections/hashed_priority_queue_test.go index 04340b939a..f5838fbe99 100644 --- a/pkg/lib/collections/hashed_priority_queue_test.go +++ b/pkg/lib/collections/hashed_priority_queue_test.go @@ -5,12 +5,21 @@ package collections_test import ( "testing" - "github.com/bacalhau-project/bacalhau/pkg/lib/collections" "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/lib/collections" ) type HashedPriorityQueueSuite struct { - suite.Suite + PriorityQueueTestSuite +} + +func (s *HashedPriorityQueueSuite) SetupTest() { + s.NewQueue = func() collections.PriorityQueueInterface[TestData] { + return collections.NewHashedPriorityQueue[string, TestData](func(t TestData) string { + return t.id + }) + } } func TestHashedPriorityQueueSuite(t *testing.T) { @@ -18,20 +27,120 @@ func TestHashedPriorityQueueSuite(t *testing.T) { } func (s *HashedPriorityQueueSuite) TestContains() { - type TestData struct { - id string - data int - } - - indexer := func(t TestData) string { - return t.id - } - - q := collections.NewHashedPriorityQueue[string, TestData](indexer) + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) s.Require().False(q.Contains("A")) - q.Enqueue(TestData{id: "A", data: 0}, 1) + q.Enqueue(TestData{"A", 0}, 1) s.Require().True(q.Contains("A")) _ = q.Dequeue() s.Require().False(q.Contains("A")) } + +func (s *HashedPriorityQueueSuite) TestPeek() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"B", 2}, 2) + + item := q.Peek() + s.Require().NotNil(item) + s.Require().Equal(TestData{"B", 2}, item.Value) + s.Require().True(q.Contains("A"), "Item A should still be in the queue after Peek") + s.Require().True(q.Contains("B"), "Item B should still be in the queue after Peek") + + _ = q.Dequeue() + s.Require().False(q.Contains("B"), "Item B should not be in the queue after Dequeue") + s.Require().True(q.Contains("A"), "Item A should still be in the queue after Dequeue") +} + +func (s *HashedPriorityQueueSuite) TestSingleItemPerKey() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"A", 2}, 2) + q.Enqueue(TestData{"A", 3}, 3) + + s.Require().Equal(1, q.Len(), "Queue should only contain one item for key 'A'") + + item := q.Dequeue() + s.Require().NotNil(item) + s.Require().Equal(TestData{"A", 3}, item.Value, "Should return the latest version of item 'A'") + s.Require().Equal(int64(3), item.Priority, "Should have the priority of the latest enqueue") + + s.Require().Nil(q.Dequeue(), "Queue should be empty after dequeuing the single item") +} + +func (s *HashedPriorityQueueSuite) TestPeekReturnsLatestVersion() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"B", 1}, 3) + q.Enqueue(TestData{"A", 2}, 2) + + item := q.Peek() + s.Require().NotNil(item) + s.Require().Equal(TestData{"B", 1}, item.Value, "Peek should return 'B' as it has the highest priority") + s.Require().Equal(int64(3), item.Priority) + + q.Enqueue(TestData{"B", 2}, 1) // Lower priority, but newer version + + item = q.Peek() + s.Require().NotNil(item) + s.Require().Equal(TestData{"A", 2}, item.Value, "Peek should now return 'A' as 'B' has lower priority") + s.Require().Equal(int64(2), item.Priority) +} + +func (s *HashedPriorityQueueSuite) TestDequeueWhereReturnsLatestVersion() { + q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData]) + + q.Enqueue(TestData{"A", 1}, 1) + q.Enqueue(TestData{"B", 1}, 2) + q.Enqueue(TestData{"A", 2}, 3) + + item := q.DequeueWhere(func(td TestData) bool { + return td.id == "A" + }) + + s.Require().NotNil(item) + s.Require().Equal(TestData{"A", 2}, item.Value, "DequeueWhere should return the latest version of 'A'") + s.Require().Equal(int64(3), item.Priority) + + s.Require().False(q.Contains("A"), "A should no longer be in the queue") + s.Require().True(q.Contains("B"), "B should still be in the queue") +} + +func (s *HashedPriorityQueueSuite) TestDuplicateKeys() { + inputs := []struct { + v TestData + p int64 + }{ + {TestData{"A", 1}, 3}, + {TestData{"B", 2}, 2}, + {TestData{"A", 3}, 1}, // Duplicate key with lower priority + {TestData{"C", 4}, 4}, + {TestData{"B", 5}, 5}, // Duplicate key with higher priority + } + + pq := s.NewQueue() + for _, tc := range inputs { + pq.Enqueue(tc.v, tc.p) + } + + expected := []struct { + v TestData + p int64 + }{ + {TestData{"B", 5}, 5}, + {TestData{"C", 4}, 4}, + {TestData{"A", 3}, 1}, + } + + for _, exp := range expected { + qitem := pq.Dequeue() + s.Require().NotNil(qitem) + s.Require().Equal(exp.v, qitem.Value) + s.Require().Equal(exp.p, qitem.Priority) + } + + s.Require().True(pq.IsEmpty()) +} diff --git a/pkg/lib/collections/priority_queue.go b/pkg/lib/collections/priority_queue.go index 72129a414e..8448111d3e 100644 --- a/pkg/lib/collections/priority_queue.go +++ b/pkg/lib/collections/priority_queue.go @@ -35,6 +35,10 @@ type PriorityQueueInterface[T any] interface { // extra PriorityQueue) for the dequeued items. DequeueWhere(matcher MatchingFunction[T]) *QueueItem[T] + // Peek returns the next highest priority item without removing it from the queue. + // It returns nil if the queue is empty. + Peek() *QueueItem[T] + // Len returns the number of items currently in the queue Len() int @@ -117,6 +121,22 @@ func (pq *PriorityQueue[T]) dequeue() *QueueItem[T] { return &QueueItem[T]{Value: item, Priority: heapItem.priority} } +// Peek returns the next highest priority item without removing it from the queue. +// It returns nil if the queue is empty. +func (pq *PriorityQueue[T]) Peek() *QueueItem[T] { + pq.mu.Lock() + defer pq.mu.Unlock() + + if pq.IsEmpty() { + return nil + } + + heapItem := pq.internalQueue[0] + item, _ := heapItem.value.(T) + + return &QueueItem[T]{Value: item, Priority: heapItem.priority} +} + // DequeueWhere allows the caller to iterate through the queue, in priority order, and // attempt to match an item using the provided `MatchingFunction`. This method has a high // time cost as dequeued but non-matching items must be held and requeued once the process diff --git a/pkg/lib/collections/priority_queue_base_test.go b/pkg/lib/collections/priority_queue_base_test.go new file mode 100644 index 0000000000..0a5db6f5cd --- /dev/null +++ b/pkg/lib/collections/priority_queue_base_test.go @@ -0,0 +1,153 @@ +//go:build unit || !integration + +package collections_test + +import ( + "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/lib/collections" +) + +type TestData struct { + id string + data int +} + +type PriorityQueueTestSuite struct { + suite.Suite + NewQueue func() collections.PriorityQueueInterface[TestData] +} + +func (s *PriorityQueueTestSuite) TestSimple() { + type testcase struct { + v TestData + p int64 + } + inputs := []testcase{ + {TestData{"B", 2}, 2}, {TestData{"A", 1}, 3}, {TestData{"C", 3}, 1}, + } + expected := []testcase{ + {TestData{"A", 1}, 3}, {TestData{"B", 2}, 2}, {TestData{"C", 3}, 1}, + } + + pq := s.NewQueue() + for _, tc := range inputs { + pq.Enqueue(tc.v, tc.p) + } + + for _, tc := range expected { + qitem := pq.Dequeue() + s.Require().NotNil(qitem) + s.Require().Equal(tc.v, qitem.Value) + s.Require().Equal(tc.p, qitem.Priority) + } + + s.Require().True(pq.IsEmpty()) +} + +func (s *PriorityQueueTestSuite) TestSimpleMin() { + type testcase struct { + v TestData + p int64 + } + inputs := []testcase{ + {TestData{"B", 2}, -2}, {TestData{"A", 1}, -3}, {TestData{"C", 3}, -1}, + } + expected := []testcase{ + {TestData{"C", 3}, -1}, {TestData{"B", 2}, -2}, {TestData{"A", 1}, -3}, + } + + pq := s.NewQueue() + for _, tc := range inputs { + pq.Enqueue(tc.v, tc.p) + } + + for _, tc := range expected { + qitem := pq.Dequeue() + s.Require().NotNil(qitem) + s.Require().Equal(tc.v, qitem.Value) + s.Require().Equal(tc.p, qitem.Priority) + } + + s.Require().True(pq.IsEmpty()) +} + +func (s *PriorityQueueTestSuite) TestEmpty() { + pq := s.NewQueue() + qitem := pq.Dequeue() + s.Require().Nil(qitem) + s.Require().True(pq.IsEmpty()) +} + +func (s *PriorityQueueTestSuite) TestDequeueWhere() { + pq := s.NewQueue() + pq.Enqueue(TestData{"A", 1}, 4) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"D", 4}, 1) + pq.Enqueue(TestData{"B", 2}, 3) + pq.Enqueue(TestData{"C", 3}, 2) + + count := pq.Len() + + qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + return possibleMatch.id == "B" + }) + + s.Require().NotNil(qitem) + s.Require().Equal(TestData{"B", 2}, qitem.Value) + s.Require().Equal(int64(3), qitem.Priority) + s.Require().Equal(count-1, pq.Len()) +} + +func (s *PriorityQueueTestSuite) TestDequeueWhereFail() { + pq := s.NewQueue() + pq.Enqueue(TestData{"A", 1}, 4) + + qitem := pq.DequeueWhere(func(possibleMatch TestData) bool { + return possibleMatch.id == "Z" + }) + + s.Require().Nil(qitem) +} + +func (s *PriorityQueueTestSuite) TestPeek() { + pq := s.NewQueue() + + // Test 1: Peek on an empty queue + item := pq.Peek() + s.Require().Nil(item, "Peek on an empty queue should return nil") + + // Test 2: Peek after adding one item + pq.Enqueue(TestData{"A", 1}, 1) + item = pq.Peek() + s.Require().NotNil(item, "Peek should return an item") + s.Require().Equal(TestData{"A", 1}, item.Value, "Peek should return the correct value") + s.Require().Equal(int64(1), item.Priority, "Peek should return the correct priority") + s.Require().Equal(1, pq.Len(), "Peek should not remove the item from the queue") + + // Test 3: Peek with multiple items + pq.Enqueue(TestData{"B", 2}, 3) + pq.Enqueue(TestData{"C", 3}, 2) + item = pq.Peek() + s.Require().NotNil(item, "Peek should return an item") + s.Require().Equal(TestData{"B", 2}, item.Value, "Peek should return the highest priority item") + s.Require().Equal(int64(3), item.Priority, "Peek should return the correct priority") + s.Require().Equal(3, pq.Len(), "Peek should not remove any items from the queue") + + // Test 4: Peek after dequeue + dequeuedItem := pq.Dequeue() + s.Require().Equal(TestData{"B", 2}, dequeuedItem.Value, "Dequeue should return the highest priority item") + item = pq.Peek() + s.Require().NotNil(item, "Peek should return an item") + s.Require().Equal(TestData{"C", 3}, item.Value, "Peek should return the new highest priority item after dequeue") + s.Require().Equal(int64(2), item.Priority, "Peek should return the correct priority") + s.Require().Equal(2, pq.Len(), "Queue length should be reduced after dequeue") + + // Test 5: Multiple peeks should return the same item + item1 := pq.Peek() + item2 := pq.Peek() + s.Require().Equal(item1, item2, "Multiple peeks should return the same item") + s.Require().Equal(2, pq.Len(), "Multiple peeks should not change the queue length") +} diff --git a/pkg/lib/collections/priority_queue_test.go b/pkg/lib/collections/priority_queue_test.go index 9a2d115ab8..ec3adeb23d 100644 --- a/pkg/lib/collections/priority_queue_test.go +++ b/pkg/lib/collections/priority_queue_test.go @@ -5,109 +5,59 @@ package collections_test import ( "testing" - "github.com/bacalhau-project/bacalhau/pkg/lib/collections" "github.com/stretchr/testify/suite" + + "github.com/bacalhau-project/bacalhau/pkg/lib/collections" ) type PriorityQueueSuite struct { - suite.Suite + PriorityQueueTestSuite +} + +func (s *PriorityQueueSuite) SetupTest() { + s.NewQueue = func() collections.PriorityQueueInterface[TestData] { + return collections.NewPriorityQueue[TestData]() + } } func TestPriorityQueueSuite(t *testing.T) { suite.Run(t, new(PriorityQueueSuite)) } -func (s *PriorityQueueSuite) TestSimple() { - type testcase struct { - v string +func (s *PriorityQueueSuite) TestDuplicateKeys() { + inputs := []struct { + v TestData p int64 - } - inputs := []testcase{ - {"B", 2}, {"A", 3}, {"C", 1}, {"A", 3}, {"C", 1}, {"B", 2}, - } - expected := []testcase{ - {"A", 3}, {"A", 3}, {"B", 2}, {"B", 2}, {"C", 1}, {"C", 1}, + }{ + {TestData{"A", 1}, 3}, + {TestData{"B", 2}, 2}, + {TestData{"A", 3}, 1}, // Duplicate key with lower priority + {TestData{"C", 4}, 4}, + {TestData{"B", 5}, 5}, // Duplicate key with higher priority } - pq := collections.NewPriorityQueue[string]() + pq := s.NewQueue() for _, tc := range inputs { - pq.Enqueue(tc.v, int64(tc.p)) + pq.Enqueue(tc.v, tc.p) } - for _, tc := range expected { - qitem := pq.Dequeue() - s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) - } - - s.Require().True(pq.IsEmpty()) -} - -func (s *PriorityQueueSuite) TestSimpleMin() { - type testcase struct { - v string + expected := []struct { + v TestData p int64 - } - inputs := []testcase{ - {"B", -2}, {"A", -3}, {"C", -1}, {"A", -3}, {"C", -1}, {"B", -2}, - } - expected := []testcase{ - {"C", -1}, {"C", -1}, {"B", -2}, {"B", -2}, {"A", -3}, {"A", -3}, + }{ + {TestData{"B", 5}, 5}, + {TestData{"C", 4}, 4}, + {TestData{"A", 1}, 3}, + {TestData{"B", 2}, 2}, + {TestData{"A", 3}, 1}, } - pq := collections.NewPriorityQueue[string]() - for _, tc := range inputs { - pq.Enqueue(tc.v, int64(tc.p)) - } - - for _, tc := range expected { + for _, exp := range expected { qitem := pq.Dequeue() s.Require().NotNil(qitem) - s.Require().Equal(tc.v, qitem.Value) - s.Require().Equal(tc.p, qitem.Priority) + s.Require().Equal(exp.v, qitem.Value) + s.Require().Equal(exp.p, qitem.Priority) } s.Require().True(pq.IsEmpty()) } - -func (s *PriorityQueueSuite) TestEmpty() { - pq := collections.NewPriorityQueue[string]() - qitem := pq.Dequeue() - s.Require().Nil(qitem) - s.Require().True(pq.IsEmpty()) -} - -func (s *PriorityQueueSuite) TestDequeueWhere() { - pq := collections.NewPriorityQueue[string]() - pq.Enqueue("A", 4) - pq.Enqueue("D", 1) - pq.Enqueue("D", 1) - pq.Enqueue("D", 1) - pq.Enqueue("D", 1) - pq.Enqueue("B", 3) - pq.Enqueue("C", 2) - - count := pq.Len() - - qitem := pq.DequeueWhere(func(possibleMatch string) bool { - return possibleMatch == "B" - }) - - s.Require().NotNil(qitem) - s.Require().Equal("B", qitem.Value) - s.Require().Equal(int64(3), qitem.Priority) - s.Require().Equal(count-1, pq.Len()) - -} - -func (s *PriorityQueueSuite) TestDequeueWhereFail() { - pq := collections.NewPriorityQueue[string]() - pq.Enqueue("A", 4) - - qitem := pq.DequeueWhere(func(possibleMatch string) bool { - return possibleMatch == "Z" - }) - - s.Require().Nil(qitem) -} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index c053cd9b94..dc34a033c0 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -13,6 +13,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "time" "github.com/rs/zerolog/pkgerrors" @@ -29,22 +30,31 @@ type LogMode string // Available logging modes const ( - LogModeDefault LogMode = "default" - LogModeStation LogMode = "station" - LogModeJSON LogMode = "json" - LogModeCombined LogMode = "combined" - LogModeEvent LogMode = "event" + LogModeDefault LogMode = "default" + LogModeJSON LogMode = "json" + LogModeCmd LogMode = "cmd" +) + +var ( + logMu sync.Mutex ) func ParseLogMode(s string) (LogMode, error) { - lm := []LogMode{LogModeDefault, LogModeStation, LogModeJSON, LogModeCombined, LogModeEvent} + lm := []LogMode{LogModeDefault, LogModeJSON, LogModeCmd} for _, logMode := range lm { if strings.ToLower(s) == strings.ToLower(string(logMode)) { return logMode, nil } } - return "Error", fmt.Errorf("%q is an invalid log-mode (valid modes: %q)", - s, lm) + return "", fmt.Errorf("%q is an invalid log-mode (valid modes: %q)", s, lm) +} + +func ParseLogLevel(s string) (zerolog.Level, error) { + l, err := zerolog.ParseLevel(s) + if err != nil { + return l, fmt.Errorf("%q is an invalid log-level", s) + } + return l, nil } var nodeIDFieldName = "NodeID" @@ -57,12 +67,14 @@ func init() { //nolint:gochecknoinits strings.HasSuffix(os.Args[0], ".test") || flag.Lookup("test.v") != nil || flag.Lookup("test.run") != nil { - configureLogging(zerolog.DebugLevel, defaultLogging()) + ConfigureLoggingLevel(zerolog.DebugLevel) + configureLogging(defaultLogging()) return } // the default log level when not running a test is ERROR - configureLogging(zerolog.ErrorLevel, bufferLogs()) + ConfigureLoggingLevel(zerolog.ErrorLevel) + configureLogging(bufferLogs()) } func ErrOrDebug(err error) zerolog.Level { @@ -82,46 +94,66 @@ type tTesting interface { func ConfigureTestLogging(t tTesting) { oldLogger := log.Logger oldContextLogger := zerolog.DefaultContextLogger - configureLogging(zerolog.DebugLevel, zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t), defaultLogFormat)) + ConfigureLoggingLevel(zerolog.DebugLevel) + configureLogging(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t), defaultLogFormat)) t.Cleanup(func() { log.Logger = oldLogger zerolog.DefaultContextLogger = oldContextLogger }) } -func ConfigureLogging(modeStr, levelStr string) error { - logModeConfig := defaultLogging() - +func ParseAndConfigureLogging(modeStr, levelStr string) error { mode, err := ParseLogMode(modeStr) if err != nil { - return fmt.Errorf("invalid log mode: %w", err) + return err } - level, err := zerolog.ParseLevel(levelStr) + level, err := ParseLogLevel(levelStr) if err != nil { - return fmt.Errorf("invalid log level: %w", err) + return err } + ConfigureLogging(mode, level) + return nil +} + +func ConfigureLogging(mode LogMode, level zerolog.Level) { + var logWriter io.Writer switch mode { case LogModeDefault: - logModeConfig = defaultLogging() - case LogModeStation: - logModeConfig = defaultStationLogging() + logWriter = defaultLogging() case LogModeJSON: - logModeConfig = jsonLogging() - case LogModeEvent: - logModeConfig = eventLogging() - case LogModeCombined: - logModeConfig = combinedLogging() + logWriter = jsonLogging() + case LogModeCmd: + logWriter = clientLogging() + default: + logWriter = defaultLogging() } - configureLogging(level, logModeConfig) - LogBufferedLogs(logModeConfig) + ConfigureLoggingLevel(level) + configureLogging(logWriter) + LogBufferedLogs(logWriter) +} + +func ParseAndConfigureLoggingLevel(level string) error { + l, err := ParseLogLevel(level) + if err != nil { + return err + } + ConfigureLoggingLevel(l) return nil } -func configureLogging(level zerolog.Level, logWriter io.Writer) { - zerolog.TimeFieldFormat = time.RFC3339Nano +func ConfigureLoggingLevel(level zerolog.Level) { + logMu.Lock() + defer logMu.Unlock() zerolog.SetGlobalLevel(level) +} + +func configureLogging(logWriter io.Writer) { + logMu.Lock() + defer logMu.Unlock() + + zerolog.TimeFieldFormat = time.RFC3339Nano info, ok := debug.ReadBuildInfo() if ok && info.Main.Path != "" { @@ -148,21 +180,13 @@ func jsonLogging() io.Writer { return os.Stdout } -func eventLogging() io.Writer { - return io.Discard -} - -func combinedLogging() io.Writer { - return zerolog.MultiLevelWriter(defaultLogging(), os.Stdout) -} - func defaultLogging() io.Writer { return zerolog.NewConsoleWriter(defaultLogFormat) } func defaultLogFormat(w *zerolog.ConsoleWriter) { isTerminal := isatty.IsTerminal(os.Stdout.Fd()) - w.Out = os.Stderr + w.Out = os.Stdout w.NoColor = !isTerminal w.TimeFormat = "15:04:05.999 |" w.PartsOrder = []string{ @@ -187,25 +211,10 @@ func defaultLogFormat(w *zerolog.ConsoleWriter) { } } -func defaultStationLogging() io.Writer { +func clientLogging() io.Writer { return zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { - isTerminal := isatty.IsTerminal(os.Stdout.Fd()) - w.Out = os.Stdout - w.NoColor = !isTerminal - w.PartsOrder = []string{ - zerolog.LevelFieldName, - zerolog.MessageFieldName, - } - - w.FormatLevel = func(i interface{}) string { - return strings.ToUpper(i.(string)) + ":" - } - w.FormatErrFieldName = func(i interface{}) string { - return "- " - } - w.FormatErrFieldValue = func(i interface{}) string { - return strings.Trim(i.(string), "\"") - } + defaultLogFormat(w) + w.PartsOrder = []string{zerolog.MessageFieldName} }) } diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index ac25c6536d..7786c4b979 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -23,7 +23,8 @@ func TestConfigureLogging(t *testing.T) { }) var logging strings.Builder - configureLogging(zerolog.InfoLevel, zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { + ConfigureLoggingLevel(zerolog.InfoLevel) + configureLogging(zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { defaultLogFormat(w) w.Out = &logging w.NoColor = true @@ -40,3 +41,66 @@ func TestConfigureLogging(t *testing.T) { assert.Contains(t, actual, "pkg/logger/testpackage/subpackage/subsubpackage/testutil.go", "Log statement doesn't contain the full package path") assert.Contains(t, actual, `stack:[{"func":"TestLog","line":`, "Log statement didn't automatically include the error's stacktrace") } + +func TestParseAndConfigureLogging(t *testing.T) { + err := ParseAndConfigureLogging("default", "debug") + assert.NoError(t, err) + assert.Equal(t, zerolog.DebugLevel, zerolog.GlobalLevel()) + + err = ParseAndConfigureLogging("json", "info") + assert.NoError(t, err) + assert.Equal(t, zerolog.InfoLevel, zerolog.GlobalLevel()) + + err = ParseAndConfigureLogging("invalid", "error") + assert.Error(t, err) + + err = ParseAndConfigureLogging("default", "invalid") + assert.Error(t, err) +} + +func TestParseLogMode(t *testing.T) { + tests := []struct { + input string + expected LogMode + hasError bool + }{ + {"default", LogModeDefault, false}, + {"json", LogModeJSON, false}, + {"cmd", LogModeCmd, false}, + {"invalid", "", true}, + } + + for _, test := range tests { + result, err := ParseLogMode(test.input) + if test.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, result) + } + } +} + +func TestParseLogLevel(t *testing.T) { + tests := []struct { + input string + expected zerolog.Level + hasError bool + }{ + {"debug", zerolog.DebugLevel, false}, + {"info", zerolog.InfoLevel, false}, + {"warn", zerolog.WarnLevel, false}, + {"error", zerolog.ErrorLevel, false}, + {"invalid", zerolog.NoLevel, true}, + } + + for _, test := range tests { + result, err := ParseLogLevel(test.input) + if test.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, result) + } + } +} diff --git a/pkg/node/heartbeat/heartbeat_test.go b/pkg/node/heartbeat/heartbeat_test.go index e9b5743620..a1a814cf66 100644 --- a/pkg/node/heartbeat/heartbeat_test.go +++ b/pkg/node/heartbeat/heartbeat_test.go @@ -6,12 +6,14 @@ import ( "context" "fmt" "strconv" + "sync" "testing" "time" "github.com/benbjohnson/clock" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/bacalhau-project/bacalhau/pkg/lib/ncl" @@ -226,3 +228,122 @@ func (s *HeartbeatTestSuite) TestSendHeartbeatError() { err = client.SendHeartbeat(ctx, 1) s.Error(err) } + +func (s *HeartbeatTestSuite) TestConcurrentHeartbeats() { + ctx := context.Background() + numNodes := 10 + numHeartbeatsPerNode := 100 + + var wg sync.WaitGroup + wg.Add(numNodes) + + for i := 0; i < numNodes; i++ { + go func(nodeID string) { + defer wg.Done() + client, err := NewClient(s.natsConn, nodeID, s.publisher) + require.NoError(s.T(), err) + + for j := 0; j < numHeartbeatsPerNode; j++ { + s.Require().NoError(client.SendHeartbeat(ctx, uint64(j))) + time.Sleep(time.Millisecond) // Small delay to simulate real-world scenario + } + }(fmt.Sprintf("node-%d", i)) + } + + wg.Wait() + + // Allow time for all heartbeats to be processed + time.Sleep(100 * time.Millisecond) + + // Verify that all nodes are marked as HEALTHY + for i := 0; i < numNodes; i++ { + nodeID := fmt.Sprintf("node-%d", i) + nodeState := &models.NodeState{Info: models.NodeInfo{NodeID: nodeID}} + s.heartbeatServer.UpdateNodeInfo(nodeState) + s.Require().Equal(models.NodeStates.HEALTHY, nodeState.Connection) + } +} + +func (s *HeartbeatTestSuite) TestConcurrentHeartbeatsWithDisconnection() { + ctx := context.Background() + numNodes := 5 + numHeartbeatsPerNode := 50 + + var wg sync.WaitGroup + wg.Add(numNodes) + + for i := 0; i < numNodes; i++ { + go func(nodeID string) { + defer wg.Done() + client, err := NewClient(s.natsConn, nodeID, s.publisher) + require.NoError(s.T(), err) + + for j := 0; j < numHeartbeatsPerNode; j++ { + s.Require().NoError(client.SendHeartbeat(ctx, uint64(j))) + time.Sleep(time.Millisecond) + + if j == numHeartbeatsPerNode/2 { + // Simulate a disconnection by advancing the clock + s.clock.Add(10 * time.Second) + } + } + }(fmt.Sprintf("node-%d", i)) + } + + wg.Wait() + + // Allow time for all heartbeats to be processed + time.Sleep(100 * time.Millisecond) + + // Verify node states + for i := 0; i < numNodes; i++ { + nodeID := fmt.Sprintf("node-%d", i) + nodeState := &models.NodeState{Info: models.NodeInfo{NodeID: nodeID}} + s.heartbeatServer.UpdateNodeInfo(nodeState) + + // The exact state might vary depending on timing, but it should be either HEALTHY or DISCONNECTED + s.Require().Contains([]models.NodeConnectionState{models.NodeStates.HEALTHY, models.NodeStates.DISCONNECTED}, nodeState.Connection) + } +} + +func (s *HeartbeatTestSuite) TestConcurrentHeartbeatsAndChecks() { + ctx := context.Background() + numNodes := 5 + numHeartbeatsPerNode := 30 + checkInterval := 50 * time.Millisecond + + var wg sync.WaitGroup + wg.Add(numNodes + 1) // +1 for the checker goroutine + + // Start the checker goroutine + go func() { + defer wg.Done() + for i := 0; i < numHeartbeatsPerNode; i++ { + s.heartbeatServer.checkQueue(ctx) + time.Sleep(checkInterval) + } + }() + + for i := 0; i < numNodes; i++ { + go func(nodeID string) { + defer wg.Done() + client, err := NewClient(s.natsConn, nodeID, s.publisher) + require.NoError(s.T(), err) + + for j := 0; j < numHeartbeatsPerNode; j++ { + s.Require().NoError(client.SendHeartbeat(ctx, uint64(j))) + time.Sleep(checkInterval / 2) // Send heartbeats faster than checks + } + }(fmt.Sprintf("node-%d", i)) + } + + wg.Wait() + + // Verify final node states + for i := 0; i < numNodes; i++ { + nodeID := fmt.Sprintf("node-%d", i) + nodeState := &models.NodeState{Info: models.NodeInfo{NodeID: nodeID}} + s.heartbeatServer.UpdateNodeInfo(nodeState) + s.Require().Equal(models.NodeStates.HEALTHY, nodeState.Connection) + } +} diff --git a/pkg/node/heartbeat/server.go b/pkg/node/heartbeat/server.go index 5d77751ff3..ef3d901f69 100644 --- a/pkg/node/heartbeat/server.go +++ b/pkg/node/heartbeat/server.go @@ -113,7 +113,7 @@ func (h *HeartbeatServer) Start(ctx context.Context) error { case <-ctx.Done(): return case <-ticker.C: - h.CheckQueue(ctx) + h.checkQueue(ctx) } } }(ctx) @@ -125,34 +125,49 @@ func (h *HeartbeatServer) Start(ctx context.Context) error { return nil } -// CheckQueue will check the queue for old heartbeats that might make a node's +// checkQueue will check the queue for old heartbeats that might make a node's // liveness either unhealthy or unknown, and will update the node's status accordingly. -func (h *HeartbeatServer) CheckQueue(ctx context.Context) { - // These are the timestamps, below which we'll consider the item in one of those two - // states - nowStamp := h.clock.Now().UTC().Unix() - disconnectedUnder := nowStamp - int64(h.disconnectedAfter.Seconds()) +// This method is not thread-safe and should be called from a single goroutine. +func (h *HeartbeatServer) checkQueue(ctx context.Context) { + // Calculate the timestamp threshold for considering a node as disconnected + disconnectedUnder := h.clock.Now().Add(-h.disconnectedAfter).UTC().Unix() for { - // Dequeue anything older than the unknown timestamp - item := h.pqueue.DequeueWhere(func(item TimestampedHeartbeat) bool { - return item.Timestamp < disconnectedUnder - }) - - // We haven't found anything old enough yet. We can stop the loop and wait - // for the next cycle. - if item == nil { + // Peek at the next (oldest) item in the queue + peek := h.pqueue.Peek() + + // If the queue is empty, we're done + if peek == nil { + break + } + + // If the oldest item is recent enough, we're done + log.Ctx(ctx).Trace(). + Dur("LastHeartbeatAge", h.clock.Now().Sub(time.Unix(peek.Value.Timestamp, 0))). + Msgf("Peeked at %+v", peek) + if peek.Value.Timestamp >= disconnectedUnder { break } + // Dequeue the item and mark the node as disconnected + item := h.pqueue.Dequeue() + if item == nil || item.Value.Timestamp >= disconnectedUnder { + // This should never happen, but we'll check just in case + log.Warn().Msgf("Unexpected item dequeued: %+v didn't match previously peeked item: %+v", item, peek) + continue + } + if item.Value.NodeID == h.nodeID { // We don't want to mark ourselves as disconnected continue } - if item.Value.Timestamp < disconnectedUnder { - h.markNodeAs(item.Value.NodeID, models.NodeStates.DISCONNECTED) - } + log.Ctx(ctx).Debug(). + Str("NodeID", item.Value.NodeID). + Int64("LastHeartbeat", item.Value.Timestamp). + Dur("LastHeartbeatAge", h.clock.Now().Sub(time.Unix(item.Value.Timestamp, 0))). + Msg("Marking node as disconnected") + h.markNodeAs(item.Value.NodeID, models.NodeStates.DISCONNECTED) } } @@ -200,36 +215,14 @@ func (h *HeartbeatServer) ShouldProcess(ctx context.Context, message *ncl.Messag // Handle will handle a message received through the legacy heartbeat topic func (h *HeartbeatServer) Handle(ctx context.Context, heartbeat Heartbeat) error { - log.Ctx(ctx).Trace().Msgf("heartbeat received from %s", heartbeat.NodeID) - timestamp := h.clock.Now().UTC().Unix() + th := TimestampedHeartbeat{Heartbeat: heartbeat, Timestamp: timestamp} + log.Ctx(ctx).Trace().Msgf("Enqueueing heartbeat from %s with seq %d. %+v", th.NodeID, th.Sequence, th) - if h.pqueue.Contains(heartbeat.NodeID) { - // If we think we already have a heartbeat from this node, we'll update the - // timestamp of the entry so it is re-prioritized in the queue by dequeuing - // and re-enqueuing it (this will ensure it is heapified correctly). - result := h.pqueue.DequeueWhere(func(item TimestampedHeartbeat) bool { - return item.NodeID == heartbeat.NodeID - }) - - if result == nil { - log.Ctx(ctx).Warn().Msgf("consistency error in heartbeat heap, node %s not found", heartbeat.NodeID) - return nil - } - - log.Ctx(ctx).Trace().Msgf("Re-enqueueing heartbeat from %s", heartbeat.NodeID) - result.Value.Timestamp = timestamp - h.pqueue.Enqueue(result.Value, timestamp) - } else { - log.Ctx(ctx).Trace().Msgf("Enqueueing heartbeat from %s", heartbeat.NodeID) - - // We'll enqueue the heartbeat message with the current timestamp. The older - // the entry, the lower the timestamp (trending to 0) and the higher the priority. - h.pqueue.Enqueue(TimestampedHeartbeat{Heartbeat: heartbeat, Timestamp: timestamp}, timestamp) - } - + // We'll enqueue the heartbeat message with the current timestamp in reverse priority so that + // older heartbeats are dequeued first. + h.pqueue.Enqueue(th, -timestamp) h.markNodeAs(heartbeat.NodeID, models.NodeStates.HEALTHY) - return nil } diff --git a/pkg/publicapi/apimodels/error.go b/pkg/publicapi/apimodels/error.go index 6d2bc98bfd..842fe5734e 100644 --- a/pkg/publicapi/apimodels/error.go +++ b/pkg/publicapi/apimodels/error.go @@ -2,7 +2,6 @@ package apimodels import ( "encoding/json" - "errors" "fmt" "io" "net/http" @@ -67,23 +66,29 @@ func (e *APIError) Error() string { } // Parse HTTP Resposne to APIError -func FromHttpResponse(resp *http.Response) (*APIError, error) { - +func GenerateAPIErrorFromHTTPResponse(resp *http.Response) *APIError { if resp == nil { - return nil, errors.New("response is nil, cannot be unmarsheld to APIError") + return NewAPIError(0, "API call error, invalid response") } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("error reading response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to read API call response body. Error: %q", err.Error())) } var apiErr APIError err = json.Unmarshal(body, &apiErr) if err != nil { - return nil, fmt.Errorf("error parsing response body: %w", err) + return NewAPIError( + resp.StatusCode, + fmt.Sprintf("Unable to parse API call response body. Error: %q. Body received: %q", + err.Error(), + string(body), + )) } // If the JSON didn't include a status code, use the HTTP Status @@ -91,7 +96,7 @@ func FromHttpResponse(resp *http.Response) (*APIError, error) { apiErr.HTTPStatusCode = resp.StatusCode } - return &apiErr, nil + return &apiErr } // FromBacError converts a bacerror.Error to an APIError diff --git a/pkg/publicapi/client/v2/client.go b/pkg/publicapi/client/v2/client.go index 18f687ed34..95979028be 100644 --- a/pkg/publicapi/client/v2/client.go +++ b/pkg/publicapi/client/v2/client.go @@ -74,18 +74,12 @@ func (c *httpClient) Get(ctx context.Context, endpoint string, in apimodels.GetR return apimodels.NewUnauthorizedError("invalid token") } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - defer resp.Body.Close() if out != nil { @@ -116,18 +110,12 @@ func (c *httpClient) write(ctx context.Context, verb, endpoint string, in apimod return apimodels.ErrInvalidToken } - var apiError *apimodels.APIError if resp.StatusCode != http.StatusOK { - apiError, err = apimodels.FromHttpResponse(resp) - if err != nil { - return err + if apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp); apiError != nil { + return apiError } } - if apiError != nil { - return apiError - } - if out != nil { if err := decodeBody(resp, &out); err != nil { return err @@ -362,12 +350,13 @@ func (c *httpClient) interceptError(ctx context.Context, err error, resp *http.R WithCode(bacerrors.UnauthorizedError) } - apiError, apiErr := apimodels.FromHttpResponse(resp) - if apiErr == nil { + apiError := apimodels.GenerateAPIErrorFromHTTPResponse(resp) + if apiError != nil { return apiError.ToBacError() } - return bacerrors.Wrap(apiErr, "server error"). + return bacerrors.New("server error"). + WithHTTPStatusCode(http.StatusInternalServerError). WithCode(bacerrors.InternalError) } diff --git a/pkg/setup/setup.go b/pkg/setup/setup.go index 345268831e..d270600bec 100644 --- a/pkg/setup/setup.go +++ b/pkg/setup/setup.go @@ -9,7 +9,6 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config" "github.com/bacalhau-project/bacalhau/pkg/config/types" - "github.com/bacalhau-project/bacalhau/pkg/logger" "github.com/bacalhau-project/bacalhau/pkg/repo/migrations" "github.com/bacalhau-project/bacalhau/pkg/repo" @@ -25,9 +24,6 @@ func SetupMigrationManager() (*repo.MigrationManager, error) { // SetupBacalhauRepo ensures that a bacalhau repo and config exist and are initialized. func SetupBacalhauRepo(cfg types.Bacalhau) (*repo.FsRepo, error) { - if err := logger.ConfigureLogging(cfg.Logging.Mode, cfg.Logging.Level); err != nil { - return nil, fmt.Errorf("failed to configure logging: %w", err) - } migrationManger, err := SetupMigrationManager() if err != nil { return nil, fmt.Errorf("failed to create migration manager: %w", err) diff --git a/webui/package.json b/webui/package.json index 6f4ad63331..114b626a3a 100644 --- a/webui/package.json +++ b/webui/package.json @@ -27,7 +27,7 @@ "class-variance-authority": "^0.7.0", "clsx": "^2.1.1", "lucide-react": "^0.438.0", - "next": "14.2.7", + "next": "14.2.10", "next-themes": "^0.3.0", "react": "^18", "react-dom": "^18", diff --git a/webui/yarn.lock b/webui/yarn.lock index f645055fda..bbbe05d0bc 100644 --- a/webui/yarn.lock +++ b/webui/yarn.lock @@ -223,10 +223,10 @@ __metadata: languageName: node linkType: hard -"@next/env@npm:14.2.7": - version: 14.2.7 - resolution: "@next/env@npm:14.2.7" - checksum: 10c0/1cda023007acda4d47036a25fba0e039d9b2df9c3770651dc289207e0537506675546c02b5b574fe92bb1adc1c887d948d5cb630673aa572754278b82d150b7e +"@next/env@npm:14.2.10": + version: 14.2.10 + resolution: "@next/env@npm:14.2.10" + checksum: 10c0/e13ad3bb18f576a62a011f08393bd20cf025c3e3aa378d9df5c10f53b63a6eacd43b47aee00ffc8b2eb7988e4af58a1e1504a8e115aad753b35f3ee1cdbbc37a languageName: node linkType: hard @@ -239,65 +239,65 @@ __metadata: languageName: node linkType: hard -"@next/swc-darwin-arm64@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-darwin-arm64@npm:14.2.7" +"@next/swc-darwin-arm64@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-darwin-arm64@npm:14.2.10" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@next/swc-darwin-x64@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-darwin-x64@npm:14.2.7" +"@next/swc-darwin-x64@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-darwin-x64@npm:14.2.10" conditions: os=darwin & cpu=x64 languageName: node linkType: hard -"@next/swc-linux-arm64-gnu@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-arm64-gnu@npm:14.2.7" +"@next/swc-linux-arm64-gnu@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-arm64-gnu@npm:14.2.10" conditions: os=linux & cpu=arm64 & libc=glibc languageName: node linkType: hard -"@next/swc-linux-arm64-musl@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-arm64-musl@npm:14.2.7" +"@next/swc-linux-arm64-musl@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-arm64-musl@npm:14.2.10" conditions: os=linux & cpu=arm64 & libc=musl languageName: node linkType: hard -"@next/swc-linux-x64-gnu@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-x64-gnu@npm:14.2.7" +"@next/swc-linux-x64-gnu@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-x64-gnu@npm:14.2.10" conditions: os=linux & cpu=x64 & libc=glibc languageName: node linkType: hard -"@next/swc-linux-x64-musl@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-linux-x64-musl@npm:14.2.7" +"@next/swc-linux-x64-musl@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-linux-x64-musl@npm:14.2.10" conditions: os=linux & cpu=x64 & libc=musl languageName: node linkType: hard -"@next/swc-win32-arm64-msvc@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-win32-arm64-msvc@npm:14.2.7" +"@next/swc-win32-arm64-msvc@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-win32-arm64-msvc@npm:14.2.10" conditions: os=win32 & cpu=arm64 languageName: node linkType: hard -"@next/swc-win32-ia32-msvc@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-win32-ia32-msvc@npm:14.2.7" +"@next/swc-win32-ia32-msvc@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-win32-ia32-msvc@npm:14.2.10" conditions: os=win32 & cpu=ia32 languageName: node linkType: hard -"@next/swc-win32-x64-msvc@npm:14.2.7": - version: 14.2.7 - resolution: "@next/swc-win32-x64-msvc@npm:14.2.7" +"@next/swc-win32-x64-msvc@npm:14.2.10": + version: 14.2.10 + resolution: "@next/swc-win32-x64-msvc@npm:14.2.10" conditions: os=win32 & cpu=x64 languageName: node linkType: hard @@ -4071,20 +4071,20 @@ __metadata: languageName: node linkType: hard -"next@npm:14.2.7": - version: 14.2.7 - resolution: "next@npm:14.2.7" - dependencies: - "@next/env": "npm:14.2.7" - "@next/swc-darwin-arm64": "npm:14.2.7" - "@next/swc-darwin-x64": "npm:14.2.7" - "@next/swc-linux-arm64-gnu": "npm:14.2.7" - "@next/swc-linux-arm64-musl": "npm:14.2.7" - "@next/swc-linux-x64-gnu": "npm:14.2.7" - "@next/swc-linux-x64-musl": "npm:14.2.7" - "@next/swc-win32-arm64-msvc": "npm:14.2.7" - "@next/swc-win32-ia32-msvc": "npm:14.2.7" - "@next/swc-win32-x64-msvc": "npm:14.2.7" +"next@npm:14.2.10": + version: 14.2.10 + resolution: "next@npm:14.2.10" + dependencies: + "@next/env": "npm:14.2.10" + "@next/swc-darwin-arm64": "npm:14.2.10" + "@next/swc-darwin-x64": "npm:14.2.10" + "@next/swc-linux-arm64-gnu": "npm:14.2.10" + "@next/swc-linux-arm64-musl": "npm:14.2.10" + "@next/swc-linux-x64-gnu": "npm:14.2.10" + "@next/swc-linux-x64-musl": "npm:14.2.10" + "@next/swc-win32-arm64-msvc": "npm:14.2.10" + "@next/swc-win32-ia32-msvc": "npm:14.2.10" + "@next/swc-win32-x64-msvc": "npm:14.2.10" "@swc/helpers": "npm:0.5.5" busboy: "npm:1.6.0" caniuse-lite: "npm:^1.0.30001579" @@ -4125,7 +4125,7 @@ __metadata: optional: true bin: next: dist/bin/next - checksum: 10c0/661ff5196f671d68ece76f3003d049848163cb21a946fe71673225e56f82726f647a1869085f010869a778e56dd601b9065221031823562aeb92026677f8a5fd + checksum: 10c0/a64991d44db2b6dcb3e7b780f14fe138fa97052767cf96a7733d1de4a857834e19a31fd5a742cca14201ad800bf03924d613e3d4e99932a1112bb9a03662f95a languageName: node linkType: hard @@ -5737,7 +5737,7 @@ __metadata: eslint-config-prettier: "npm:^9.1.0" eslint-plugin-prettier: "npm:^5.2.1" lucide-react: "npm:^0.438.0" - next: "npm:14.2.7" + next: "npm:14.2.10" next-themes: "npm:^0.3.0" openapi-typescript-codegen: "npm:^0.29.0" postcss: "npm:^8"