Skip to content

Commit

Permalink
Merge branch 'main' into eng-192-compute-node-fails-to-start-on-small…
Browse files Browse the repository at this point in the history
…-nodes
  • Loading branch information
wdbaruni authored Dec 18, 2024
2 parents e4c3408 + 76836dd commit 11d9d2c
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 24 deletions.
14 changes: 10 additions & 4 deletions cmd/cli/job/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,18 @@ var (
Value: func(e *models.Execution) string { return output.Elapsed(e.GetModifyTime()) },
}
executionColumnID = output.TableColumn[*models.Execution]{
ColumnConfig: table.ColumnConfig{Name: "ID", WidthMax: 10, WidthMaxEnforcer: text.WrapText},
Value: func(e *models.Execution) string { return idgen.ShortUUID(e.ID) },
ColumnConfig: table.ColumnConfig{
Name: "ID",
WidthMax: idgen.ShortIDLengthWithPrefix,
WidthMaxEnforcer: func(col string, maxLen int) string { return idgen.ShortUUID(col) }},
Value: func(e *models.Execution) string { return e.ID },
}
executionColumnNodeID = output.TableColumn[*models.Execution]{
ColumnConfig: table.ColumnConfig{Name: "Node ID", WidthMax: 10, WidthMaxEnforcer: text.WrapText},
Value: func(e *models.Execution) string { return idgen.ShortNodeID(e.NodeID) },
ColumnConfig: table.ColumnConfig{
Name: "Node ID",
WidthMax: idgen.ShortIDLengthWithPrefix,
WidthMaxEnforcer: func(col string, maxLen int) string { return idgen.ShortUUID(col) }},
Value: func(e *models.Execution) string { return e.NodeID },
}
executionColumnRev = output.TableColumn[*models.Execution]{
ColumnConfig: table.ColumnConfig{Name: "Rev.", WidthMax: 4, WidthMaxEnforcer: text.WrapText},
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/job/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewHistoryCmd() *cobra.Command {
}

var historyColumns = []output.TableColumn[*models.JobHistory]{
cols.HistoryTime,
cols.HistoryDateTime,
cols.HistoryLevel,
cols.HistoryExecID,
cols.HistoryTopic,
Expand Down
7 changes: 5 additions & 2 deletions cmd/cli/node/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import (

var alwaysColumns = []output.TableColumn[*models.NodeState]{
{
ColumnConfig: table.ColumnConfig{Name: "id"},
Value: func(node *models.NodeState) string { return idgen.ShortNodeID(node.Info.ID()) },
ColumnConfig: table.ColumnConfig{
Name: "id",
WidthMax: idgen.ShortIDLengthWithPrefix,
WidthMaxEnforcer: func(col string, maxLen int) string { return idgen.ShortNodeID(col) }},
Value: func(node *models.NodeState) string { return node.Info.ID() },
},
{
ColumnConfig: table.ColumnConfig{Name: "type"},
Expand Down
11 changes: 5 additions & 6 deletions cmd/util/cols/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
)

var (
HistoryTime = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Time", WidthMax: len(time.StampMilli), WidthMaxEnforcer: output.ShortenTime},
Value: func(j *models.JobHistory) string { return j.Occurred().Format(time.StampMilli) },
}
HistoryTimeOnly = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Time", WidthMax: len(TimeOnlyMilli), WidthMaxEnforcer: text.Trim},
Value: func(j *models.JobHistory) string { return j.Occurred().Format(TimeOnlyMilli) },
Expand All @@ -31,12 +27,15 @@ var (
Value: func(jwi *models.JobHistory) string { return jwi.Type.String() },
}
HistoryExecID = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Exec. ID", WidthMax: 10, WidthMaxEnforcer: text.WrapText},
ColumnConfig: table.ColumnConfig{
Name: "Exec. ID",
WidthMax: idgen.ShortIDLengthWithPrefix,
WidthMaxEnforcer: func(col string, maxLen int) string { return idgen.ShortUUID(col) }},
Value: func(j *models.JobHistory) string {
if j.ExecutionID == "" {
return ""
}
return idgen.ShortUUID(j.ExecutionID)
return j.ExecutionID
},
}
HistoryTopic = output.TableColumn[*models.JobHistory]{
Expand Down
1 change: 0 additions & 1 deletion pkg/models/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (n NoopNodeInfoDecorator) DecorateNodeInfo(ctx context.Context, nodeInfo No
// to further its view of the networks conditions. ComputeNodeInfo is non-nil iff the NodeType is NodeTypeCompute.
// TODO(walid): add Validate() method to NodeInfo and make sure it is called in all the places where it is initialized
type NodeInfo struct {
// TODO replace all access on this field with the `ID()` method
NodeID string `json:"NodeID"`
NodeType NodeType `json:"NodeType"`
Labels map[string]string `json:"Labels"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/nats/proxy/management_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (p *ManagementProxy) Register(ctx context.Context,
var asyncRes *concurrency.AsyncResult[legacy.RegisterResponse]

asyncRes, err = send[legacy.RegisterRequest, legacy.RegisterResponse](
ctx, p.conn, request.Info.NodeID, request, RegisterNode)
ctx, p.conn, request.Info.ID(), request, RegisterNode)

if err != nil {
return nil, errors.Wrap(err, "failed to send response to registration request")
Expand Down
8 changes: 4 additions & 4 deletions pkg/orchestrator/selection/ranking/available_capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (s *AvailableCapacityNodeRanker) calculateWeightedCapacities(nodes []models
weightedAvailableCapacity := weightedCapacity(node.ComputeNodeInfo.AvailableCapacity, weights)
weightedQueueUsedCapacity := weightedCapacity(node.ComputeNodeInfo.QueueUsedCapacity, weights)

weightedAvailableCapacities[node.NodeID] = weightedAvailableCapacity
weightedQueueCapacities[node.NodeID] = weightedQueueUsedCapacity
weightedAvailableCapacities[node.ID()] = weightedAvailableCapacity
weightedQueueCapacities[node.ID()] = weightedQueueUsedCapacity

if weightedAvailableCapacity > maxWeightedAvailableCapacity {
maxWeightedAvailableCapacity = weightedAvailableCapacity
Expand All @@ -124,8 +124,8 @@ func (s *AvailableCapacityNodeRanker) rankNodesBasedOnCapacities(ctx context.Con
ranks := make([]orchestrator.NodeRank, len(nodes))

for i, node := range nodes {
weightedAvailableCapacity := wAvailableCapacities[node.NodeID]
weightedQueueUsedCapacity := wQueueCapacities[node.NodeID]
weightedAvailableCapacity := wAvailableCapacities[node.ID()]
weightedQueueUsedCapacity := wQueueCapacities[node.ID()]

// Calculate the ratios of available and queue capacities
availableRatio := 0.0
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/bprotocol/orchestrator/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *HeartbeatTestSuite) TestHeartbeatScenarios() {

s.clock.Add(tc.waitUntil)

nodeState, err := s.nodeManager.Get(ctx, nodeInfo.NodeID)
nodeState, err := s.nodeManager.Get(ctx, nodeInfo.ID())
if tc.handshake {
s.Require().NoError(err)
s.Require().Equal(tc.expectedState, nodeState.ConnectionState.Status, fmt.Sprintf("incorrect state in %s", tc.name))
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/nclprotocol/compute/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (cp *ControlPlane) heartbeat(ctx context.Context) error {
cp.latestNodeInfo = nodeInfo

msg := envelope.NewMessage(messages.HeartbeatRequest{
NodeID: cp.latestNodeInfo.NodeID,
NodeID: cp.latestNodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: cp.incomingSeqTracker.GetLastSeqNum(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/nclprotocol/compute/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *ControlPlaneTestSuite) TestHeartbeat() {

nodeInfo := s.nodeInfoProvider.GetNodeInfo(s.ctx)
heartbeatMsg := envelope.NewMessage(messages.HeartbeatRequest{
NodeID: nodeInfo.NodeID,
NodeID: nodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: s.seqTracker.GetLastSeqNum(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/transport/nclprotocol/compute/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *ConnectionManagerTestSuite) TestSuccessfulConnection() {
heartbeats := s.mockResponder.GetHeartbeats()
s.Require().Len(heartbeats, 1)
s.Require().Equal(messages.HeartbeatRequest{
NodeID: nodeInfo.NodeID,
NodeID: nodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: handshakeSeqNum, // Should use sequence number from handshake response
Expand All @@ -178,7 +178,7 @@ func (s *ConnectionManagerTestSuite) TestSuccessfulConnection() {
s.Require().Eventually(func() bool {
lastHeartbeat := s.mockResponder.GetHeartbeats()[len(s.mockResponder.GetHeartbeats())-1]
return reflect.DeepEqual(lastHeartbeat, messages.HeartbeatRequest{
NodeID: nodeInfo.NodeID,
NodeID: nodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: handshakeSeqNum, // Should continue using sequence number from handshake
Expand Down

0 comments on commit 11d9d2c

Please sign in to comment.