Skip to content

Commit

Permalink
Replace NodeID field access with ID()
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Dec 18, 2024
1 parent 7cf0f35 commit 87b51c9
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 11 deletions.
1 change: 0 additions & 1 deletion pkg/models/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (n NoopNodeInfoDecorator) DecorateNodeInfo(ctx context.Context, nodeInfo No
// to further its view of the networks conditions. ComputeNodeInfo is non-nil iff the NodeType is NodeTypeCompute.
// TODO(walid): add Validate() method to NodeInfo and make sure it is called in all the places where it is initialized
type NodeInfo struct {
// TODO replace all access on this field with the `ID()` method
NodeID string `json:"NodeID"`
NodeType NodeType `json:"NodeType"`
Labels map[string]string `json:"Labels"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/nats/proxy/management_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (p *ManagementProxy) Register(ctx context.Context,
var asyncRes *concurrency.AsyncResult[legacy.RegisterResponse]

asyncRes, err = send[legacy.RegisterRequest, legacy.RegisterResponse](
ctx, p.conn, request.Info.NodeID, request, RegisterNode)
ctx, p.conn, request.Info.ID(), request, RegisterNode)

if err != nil {
return nil, errors.Wrap(err, "failed to send response to registration request")
Expand Down
8 changes: 4 additions & 4 deletions pkg/orchestrator/selection/ranking/available_capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (s *AvailableCapacityNodeRanker) calculateWeightedCapacities(nodes []models
weightedAvailableCapacity := weightedCapacity(node.ComputeNodeInfo.AvailableCapacity, weights)
weightedQueueUsedCapacity := weightedCapacity(node.ComputeNodeInfo.QueueUsedCapacity, weights)

weightedAvailableCapacities[node.NodeID] = weightedAvailableCapacity
weightedQueueCapacities[node.NodeID] = weightedQueueUsedCapacity
weightedAvailableCapacities[node.ID()] = weightedAvailableCapacity
weightedQueueCapacities[node.ID()] = weightedQueueUsedCapacity

if weightedAvailableCapacity > maxWeightedAvailableCapacity {
maxWeightedAvailableCapacity = weightedAvailableCapacity
Expand All @@ -124,8 +124,8 @@ func (s *AvailableCapacityNodeRanker) rankNodesBasedOnCapacities(ctx context.Con
ranks := make([]orchestrator.NodeRank, len(nodes))

for i, node := range nodes {
weightedAvailableCapacity := wAvailableCapacities[node.NodeID]
weightedQueueUsedCapacity := wQueueCapacities[node.NodeID]
weightedAvailableCapacity := wAvailableCapacities[node.ID()]
weightedQueueUsedCapacity := wQueueCapacities[node.ID()]

// Calculate the ratios of available and queue capacities
availableRatio := 0.0
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/bprotocol/orchestrator/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *HeartbeatTestSuite) TestHeartbeatScenarios() {

s.clock.Add(tc.waitUntil)

nodeState, err := s.nodeManager.Get(ctx, nodeInfo.NodeID)
nodeState, err := s.nodeManager.Get(ctx, nodeInfo.ID())
if tc.handshake {
s.Require().NoError(err)
s.Require().Equal(tc.expectedState, nodeState.ConnectionState.Status, fmt.Sprintf("incorrect state in %s", tc.name))
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/nclprotocol/compute/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (cp *ControlPlane) heartbeat(ctx context.Context) error {
cp.latestNodeInfo = nodeInfo

msg := envelope.NewMessage(messages.HeartbeatRequest{
NodeID: cp.latestNodeInfo.NodeID,
NodeID: cp.latestNodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: cp.incomingSeqTracker.GetLastSeqNum(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/nclprotocol/compute/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *ControlPlaneTestSuite) TestHeartbeat() {

nodeInfo := s.nodeInfoProvider.GetNodeInfo(s.ctx)
heartbeatMsg := envelope.NewMessage(messages.HeartbeatRequest{
NodeID: nodeInfo.NodeID,
NodeID: nodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: s.seqTracker.GetLastSeqNum(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/transport/nclprotocol/compute/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *ConnectionManagerTestSuite) TestSuccessfulConnection() {
heartbeats := s.mockResponder.GetHeartbeats()
s.Require().Len(heartbeats, 1)
s.Require().Equal(messages.HeartbeatRequest{
NodeID: nodeInfo.NodeID,
NodeID: nodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: handshakeSeqNum, // Should use sequence number from handshake response
Expand All @@ -178,7 +178,7 @@ func (s *ConnectionManagerTestSuite) TestSuccessfulConnection() {
s.Require().Eventually(func() bool {
lastHeartbeat := s.mockResponder.GetHeartbeats()[len(s.mockResponder.GetHeartbeats())-1]
return reflect.DeepEqual(lastHeartbeat, messages.HeartbeatRequest{
NodeID: nodeInfo.NodeID,
NodeID: nodeInfo.ID(),
AvailableCapacity: nodeInfo.ComputeNodeInfo.AvailableCapacity,
QueueUsedCapacity: nodeInfo.ComputeNodeInfo.QueueUsedCapacity,
LastOrchestratorSeqNum: handshakeSeqNum, // Should continue using sequence number from handshake
Expand Down

0 comments on commit 87b51c9

Please sign in to comment.