Skip to content

Commit e390884

Browse files
authored
fix orchestrator data-plan startSeqNum (#4745)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a new field, `StartingOrchestratorSeqNum`, in the handshake response to enhance information during node registration. - **Bug Fixes** - Improved assertions in testing to ensure correct handling of sequence numbers for both new and reconnecting nodes during handshakes. - **Documentation** - Updated comments and logging statements for clarity and consistency regarding the handshake process. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 559e78a commit e390884

File tree

4 files changed

+17
-8
lines changed

4 files changed

+17
-8
lines changed

pkg/models/messages/node.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ type HandshakeRequest struct {
1515

1616
// HandshakeResponse is sent in response to handshake requests
1717
type HandshakeResponse struct {
18-
Accepted bool `json:"accepted"`
19-
Reason string `json:"reason,omitempty"`
20-
LastComputeSeqNum uint64 `json:"LastComputeSeqNum"` // Last seq received from compute node
18+
Accepted bool `json:"accepted"`
19+
Reason string `json:"reason,omitempty"`
20+
LastComputeSeqNum uint64 `json:"LastComputeSeqNum"` // Last seq received from compute node
21+
StartingOrchestratorSeqNum uint64 `json:"LastOrchestratorSeqNum"` // Seq to start sending to compute node
2122
}
2223

2324
type HeartbeatRequest struct {

pkg/orchestrator/nodes/manager.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -492,9 +492,10 @@ func (n *nodesManager) Handshake(
492492
reason = "node reconnected"
493493
}
494494
return messages.HandshakeResponse{
495-
Accepted: true,
496-
Reason: reason,
497-
LastComputeSeqNum: state.ConnectionState.LastComputeSeqNum,
495+
Accepted: true,
496+
Reason: reason,
497+
LastComputeSeqNum: state.ConnectionState.LastComputeSeqNum,
498+
StartingOrchestratorSeqNum: state.ConnectionState.LastOrchestratorSeqNum,
498499
}, nil
499500
}
500501

pkg/orchestrator/nodes/manager_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ func (s *NodeManagerTestSuite) TestHeartbeatMaintainsConnection() {
148148
}
149149

150150
// Edge Cases and Error Scenarios
151-
152151
func (s *NodeManagerTestSuite) TestHandshakeSequenceNumberLogic() {
153152
// Test initial handshake with new node
154153
nodeInfo := s.createNodeInfo("new-node")
@@ -181,6 +180,8 @@ func (s *NodeManagerTestSuite) TestHandshakeSequenceNumberLogic() {
181180
s.Require().NoError(err)
182181
s.Assert().Equal(latestSeqNum, state.ConnectionState.LastOrchestratorSeqNum,
183182
"New node should be assigned latest sequence number")
183+
s.Assert().Equal(latestSeqNum, resp1.StartingOrchestratorSeqNum,
184+
"New node should receive latest sequence number as starting point")
184185

185186
// Update sequence numbers via heartbeat
186187
updatedOrchSeqNum := uint64(200)
@@ -210,6 +211,8 @@ func (s *NodeManagerTestSuite) TestHandshakeSequenceNumberLogic() {
210211
s.Require().NoError(err)
211212
s.Require().True(resp2.Accepted)
212213
s.Assert().Contains(resp2.Reason, "reconnected")
214+
s.Assert().Equal(updatedOrchSeqNum, resp2.StartingOrchestratorSeqNum,
215+
"Reconnecting node should receive its last known sequence number")
213216

214217
// Verify sequence numbers were preserved from previous state
215218
state, err = s.manager.Get(ctx, nodeInfo.ID())
@@ -235,6 +238,8 @@ func (s *NodeManagerTestSuite) TestHandshakeSequenceNumberEdgeCases() {
235238
s.Require().NoError(err)
236239
s.Assert().Equal(uint64(0), state1.ConnectionState.LastOrchestratorSeqNum,
237240
"New node should get zero sequence when event store is empty")
241+
s.Assert().Equal(uint64(0), resp1.StartingOrchestratorSeqNum,
242+
"New node should receive zero as starting sequence when event store is empty")
238243

239244
// Test concurrent handshakes with sequence numbers
240245
var wg sync.WaitGroup
@@ -271,6 +276,8 @@ func (s *NodeManagerTestSuite) TestHandshakeSequenceNumberEdgeCases() {
271276
s.Require().NoError(err)
272277
s.Assert().Equal(latestSeqNum, state.ConnectionState.LastOrchestratorSeqNum,
273278
"Concurrent new nodes should all get latest sequence number")
279+
s.Assert().Equal(latestSeqNum, resp.StartingOrchestratorSeqNum,
280+
"Concurrent new nodes should receive latest sequence number as starting point")
274281
}(i)
275282
}
276283

pkg/transport/nclprotocol/orchestrator/manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (cm *ComputeManager) handleHandshakeRequest(ctx context.Context, msg *envel
186186
}
187187

188188
// Create data plane for accepted node
189-
if err = cm.setupDataPlane(ctx, request.NodeInfo, request.LastOrchestratorSeqNum); err != nil {
189+
if err = cm.setupDataPlane(ctx, request.NodeInfo, response.StartingOrchestratorSeqNum); err != nil {
190190
return nil, fmt.Errorf("setup data plane failed: %w", err)
191191
}
192192

0 commit comments

Comments
 (0)