Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into ud/bashtub
Browse files Browse the repository at this point in the history
  • Loading branch information
udsamani committed Oct 9, 2024
2 parents 409ae9e + 764831d commit c1ec5c0
Show file tree
Hide file tree
Showing 24 changed files with 829 additions and 327 deletions.
2 changes: 1 addition & 1 deletion cmd/cli/config/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Each key shown can be used with:
if err != nil {
return err
}
log.Info().Msgf("Config loaded from: %s, and with data-dir %s", cfg.Paths(), cfg.Get(types.DataDirKey))
log.Debug().Msgf("Config loaded from: %s, and with data-dir %s", cfg.Paths(), cfg.Get(types.DataDirKey))
return list(cmd, cfg, o)
},
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/cli/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"strconv"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"k8s.io/kubectl/pkg/util/i18n"
Expand Down Expand Up @@ -103,9 +104,7 @@ func NewCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) error {
// TODO: a hack to force debug logging for devstack
// until I figure out why flags and env vars are not working
if err := logger.ConfigureLogging(string(logger.LogModeDefault), "debug"); err != nil {
return fmt.Errorf("failed to configure logging: %w", err)
}
logger.ConfigureLogging(logger.LogModeDefault, zerolog.DebugLevel)
return runDevstack(cmd, ODs)
},
}
Expand Down
6 changes: 1 addition & 5 deletions cmd/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,11 @@ func NewRootCmd() *cobra.Command {
// While we allow users to configure logging via the config file, they are applied
// and will override this configuration at a later stage when the config is loaded.
// This is needed to ensure any logs before the config is loaded are captured.
logMode := viper.GetString(types.LoggingModeKey)
if logMode == "" {
logMode = string(logger.LogModeDefault)
}
logLevel := viper.GetString(types.LoggingLevelKey)
if logLevel == "" {
logLevel = "Info"
}
if err := logger.ConfigureLogging(logMode, logLevel); err != nil {
if err := logger.ParseAndConfigureLogging(string(logger.LogModeCmd), logLevel); err != nil {
return fmt.Errorf("failed to configure logging: %w", err)
}

Expand Down
5 changes: 4 additions & 1 deletion cmd/cli/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func NewCmd() *cobra.Command {
return fmt.Errorf("failed to setup config: %w", err)
}

if err = logger.ParseAndConfigureLogging(cfg.Logging.Mode, cfg.Logging.Level); err != nil {
return fmt.Errorf("failed to configure logging: %w", err)
}

log.Info().Msgf("Config loaded from: %s, and with data-dir %s",
rawCfg.Paths(), rawCfg.Get(types.DataDirKey))

Expand All @@ -103,7 +107,6 @@ func NewCmd() *cobra.Command {
if err != nil {
return fmt.Errorf("failed to reconcile repo: %w", err)
}

return serve(cmd, cfg, fsr)
},
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/util/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bacalhau-project/bacalhau/cmd/util/hook"
"github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/repo"
"github.com/bacalhau-project/bacalhau/pkg/setup"
)
Expand Down Expand Up @@ -75,6 +76,12 @@ func SetupConfigType(cmd *cobra.Command) (*config.Config, error) {
if err != nil {
return nil, err
}

// We always apply the configured logging level. Logging mode on the other hand is only applied with serve cmd
if err = logger.ParseAndConfigureLoggingLevel(cfg.Get(types.LoggingLevelKey).(string)); err != nil {
return nil, fmt.Errorf("failed to configure logging: %w", err)
}

return cfg, nil
}

Expand Down
4 changes: 2 additions & 2 deletions ops/terraform/dev.tfvars
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
bacalhau_version = "v1.4.0"
bacalhau_version = "v1.5.0"
bacalhau_branch = ""
bacalhau_port = "1235"
bacalhau_environment = "development"
ipfs_version = "v0.12.2"
ipfs_version = "v0.18.1"
gcp_project = "bacalhau-dev"
grafana_cloud_prometheus_user = "14299"
grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push"
Expand Down
7 changes: 3 additions & 4 deletions ops/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ export GRAFANA_CLOUD_TEMPO_ENDPOINT="${var.grafana_cloud_tempo_endpoint}"
export OTEL_COLLECTOR_VERSION="${var.otel_collector_version}"
export OTEL_EXPORTER_OTLP_ENDPOINT="${var.otel_collector_endpoint}"
export OTEL_RESOURCE_ATTRIBUTES="deployment.environment=${terraform.workspace}"
export BACALHAU_NODE_NETWORK_ORCHESTRATORS="${var.internal_ip_addresses[0]}:4222"
export BACALHAU_NODE_NETWORK_ADVERTISEDADDRESS="${var.public_ip_addresses[count.index]}:4222"
export BACALHAU_NODE_NETWORK_CLUSTER_PEERS=""
export BACALHAU_ORCHESTRATORS="${var.internal_ip_addresses[0]}:4222"
export BACALHAU_ORCHESTRATOR_ADVERTISE="${var.public_ip_addresses[count.index]}:4222"
export BACALHAU_LOCAL_PUBLISHER_ADDRESS="${var.public_ip_addresses[count.index]}"
export BACALHAU_WEBUI_BACKEND="http://${var.public_ip_addresses[0]}:1234"
### secrets are installed in the install-node.sh script
export SECRETS_GRAFANA_CLOUD_PROMETHEUS_API_KEY="${var.grafana_cloud_prometheus_api_key}"
Expand Down
4 changes: 2 additions & 2 deletions ops/terraform/prod.tfvars
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
bacalhau_version = "v1.4.0"
bacalhau_version = "v1.5.0"
bacalhau_port = "1235"
bacalhau_environment = "production"
ipfs_version = "v0.12.2"
ipfs_version = "v0.18.1"
gcp_project = "bacalhau-prod"
grafana_cloud_prometheus_user = "1008771"
grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push"
Expand Down
18 changes: 10 additions & 8 deletions ops/terraform/remote_files/scripts/start-bacalhau.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ bacalhau serve \
--job-execution-timeout-bypass-client-id="${TRUSTED_CLIENT_IDS}" \
--ipfs-connect /ip4/127.0.0.1/tcp/5001 \
--api-port 1234 \
--web-ui="${BACALHAU_NODE_WEBUI}" \
--web-ui-port 80 \
--labels owner=bacalhau \
--requester-job-translation-enabled \
--config Job.Defaults.Batch.Task.Publisher.Type=local \
--config Job.Defaults.Ops.Task.Publisher.Type=local \
--config Job.Defaults.Service.Task.Publisher.Type=local \
--config Job.Defaults.Daemon.Task.Publisher.Type=local \
--local-publisher-address "${BACALHAU_LOCAL_PUBLISHER_ADDRESS}"
--config DisableAnalytics \
--config labels="owner=bacalhau,name=node-${TERRAFORM_NODE_INDEX}"\
--config Compute.Orchestrators="${BACALHAU_ORCHESTRATORS}" \
--config Orchestrator.Advertise="${BACALHAU_ORCHESTRATOR_ADVERTISE}" \
--config WebUI.Enabled="${BACALHAU_NODE_WEBUI}" \
--config WebUI.Listen=0.0.0.0:80 \
--config WebUI.Backend="${BACALHAU_WEBUI_BACKEND}" \
--config JobDefaults.Batch.Task.Publisher.Type=local \
--config JobDefaults.Ops.Task.Publisher.Type=local \
--config Publishers.Types.Local.Address="${BACALHAU_LOCAL_PUBLISHER_ADDRESS}"
4 changes: 2 additions & 2 deletions ops/terraform/stage.tfvars
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
bacalhau_version = "v1.5.0-dev2"
bacalhau_version = "v1.5.0"
bacalhau_branch = "" # deploy from a branch instead of the version above
bacalhau_port = "1235"
bacalhau_environment = "staging"
ipfs_version = "v0.12.2"
ipfs_version = "v0.18.1"
gcp_project = "bacalhau-stage"
grafana_cloud_prometheus_user = "1008771"
grafana_cloud_prometheus_endpoint = "https://prometheus-us-central1.grafana.net/api/prom/push"
Expand Down
137 changes: 112 additions & 25 deletions pkg/lib/collections/hashed_priority_queue.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,60 @@
package collections

import "sync"

import (
"sync"
)

// HashedPriorityQueue is a priority queue that maintains only a single item per unique key.
// It combines the functionality of a hash map and a priority queue to provide efficient
// operations with the following key characteristics:
//
// 1. Single Item Per Key: The queue maintains only the latest version of an item for each
// unique key. When a new item with an existing key is enqueued, it replaces the old item
// instead of adding a duplicate.
//
// 2. Lazy Dequeuing: Outdated items (those that have been replaced by a newer version) are
// not immediately removed from the underlying queue. Instead, they are filtered out
// during dequeue operations. This approach improves enqueue performance
// at the cost of potentially slower dequeue operations.
//
// 3. Higher Enqueue Throughput: By avoiding immediate removal of outdated items, the
// HashedPriorityQueue achieves higher enqueue throughput. This makes it particularly
// suitable for scenarios with frequent updates to existing items.
//
// 4. Eventually Consistent: The queue becomes consistent over time as outdated items are
// lazily removed during dequeue operations. This means that the queue's length and the
// items it contains become accurate as items are dequeued.
//
// 5. Memory Consideration: Due to the lazy removal of outdated items, the underlying queue
// may temporarily hold more items than there are unique keys. This trade-off allows for
// better performance but may use more memory compared to a strictly consistent queue.
//
// Use HashedPriorityQueue when you need a priority queue that efficiently handles updates
// to existing items and can tolerate some latency in removing outdated entries in favor
// of higher enqueue performance.
type HashedPriorityQueue[K comparable, T any] struct {
identifiers map[K]struct{}
queue *PriorityQueue[T]
identifiers map[K]int64
queue *PriorityQueue[versionedItem[T]]
mu sync.RWMutex
indexer IndexerFunc[K, T]
}

// versionedItem wraps the actual data item with a version number.
// This structure is used internally by HashedPriorityQueue to implement
// the versioning mechanism that allows for efficient updates and
// lazy removal of outdated items. The queue is only interested in
// the latest version of an item for each unique key:
// - data: The actual item of type T stored in the queue.
// - version: A monotonically increasing number representing the
// version of this item. When an item with the same key is enqueued,
// its version is incremented. This allows the queue to identify
// the most recent version during dequeue operations and discard
// any older versions of the same item.
type versionedItem[T any] struct {
data T
version int64
}

// IndexerFunc is used to find the key (of type K) from the provided
// item (T). This will be used for the item lookup in `Contains`
type IndexerFunc[K comparable, T any] func(item T) K
Expand All @@ -18,12 +64,24 @@ type IndexerFunc[K comparable, T any] func(item T) K
// be used on Enqueue/Dequeue to keep the index up to date.
func NewHashedPriorityQueue[K comparable, T any](indexer IndexerFunc[K, T]) *HashedPriorityQueue[K, T] {
return &HashedPriorityQueue[K, T]{
identifiers: make(map[K]struct{}),
queue: NewPriorityQueue[T](),
identifiers: make(map[K]int64),
queue: NewPriorityQueue[versionedItem[T]](),
indexer: indexer,
}
}

// isLatestVersion checks if the given item is the latest version
func (q *HashedPriorityQueue[K, T]) isLatestVersion(item versionedItem[T]) bool {
k := q.indexer(item.data)
currentVersion := q.identifiers[k]
return item.version == currentVersion
}

// unwrapQueueItem converts a versionedItem to a QueueItem
func (q *HashedPriorityQueue[K, T]) unwrapQueueItem(item *QueueItem[versionedItem[T]]) *QueueItem[T] {
return &QueueItem[T]{Value: item.Value.data, Priority: item.Priority}
}

// Contains will return true if the provided identifier (of type K)
// will be found in this queue, false if it is not present.
func (q *HashedPriorityQueue[K, T]) Contains(id K) bool {
Expand All @@ -40,9 +98,9 @@ func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int64) {
defer q.mu.Unlock()

k := q.indexer(data)

q.identifiers[k] = struct{}{}
q.queue.Enqueue(data, priority)
version := q.identifiers[k] + 1
q.identifiers[k] = version
q.queue.Enqueue(versionedItem[T]{data: data, version: version}, priority)
}

// Dequeue returns the next highest priority item, returning both
Expand All @@ -53,16 +111,39 @@ func (q *HashedPriorityQueue[K, T]) Dequeue() *QueueItem[T] {
q.mu.Lock()
defer q.mu.Unlock()

item := q.queue.Dequeue()
if item == nil {
return nil
for {
item := q.queue.Dequeue()
if item == nil {
return nil
}

if q.isLatestVersion(item.Value) {
k := q.indexer(item.Value.data)
delete(q.identifiers, k)
return q.unwrapQueueItem(item)
}
}
}

// Peek returns the next highest priority item without removing it from the queue.
// It returns nil if the queue is empty.
func (q *HashedPriorityQueue[K, T]) Peek() *QueueItem[T] {
q.mu.RLock()
defer q.mu.RUnlock()

// Find the key for the item and delete it from the presence map
k := q.indexer(item.Value)
delete(q.identifiers, k)
for {
item := q.queue.Peek()
if item == nil {
return nil
}

return item
if q.isLatestVersion(item.Value) {
return q.unwrapQueueItem(item)
}

// If the peeked item is outdated, remove it and continue
q.queue.Dequeue()
}
}

// DequeueWhere allows the caller to iterate through the queue, in priority order, and
Expand All @@ -74,26 +155,32 @@ func (q *HashedPriorityQueue[K, T]) DequeueWhere(matcher MatchingFunction[T]) *Q
q.mu.Lock()
defer q.mu.Unlock()

item := q.queue.DequeueWhere(matcher)
if item == nil {
return nil
}
for {
item := q.queue.DequeueWhere(func(vi versionedItem[T]) bool {
return matcher(vi.data)
})

k := q.indexer(item.Value)
delete(q.identifiers, k)
if item == nil {
return nil
}

return item
if q.isLatestVersion(item.Value) {
k := q.indexer(item.Value.data)
delete(q.identifiers, k)
return q.unwrapQueueItem(item)
}
}
}

// Len returns the number of items currently in the queue
func (q *HashedPriorityQueue[K, T]) Len() int {
return q.queue.Len()
return len(q.identifiers)
}

// IsEmpty returns a boolean denoting whether the queue is
// currently empty or not.
func (q *HashedPriorityQueue[K, T]) IsEmpty() bool {
return q.queue.Len() == 0
return q.Len() == 0
}

var _ PriorityQueueInterface[struct{}] = (*HashedPriorityQueue[string, struct{}])(nil)
Loading

0 comments on commit c1ec5c0

Please sign in to comment.