Skip to content

Commit 36c44b5

Browse files
authored
Faster reconnect on handshake required response (#4772)
When orchestrator restarts, compute nodes wait for 5 failed heartbeats (~75s) before attempting to reconnect, even though orchestrator immediately returns "Handshake required" errors. Modify compute nodes to detect this specific error and trigger immediate reconnection, rather than waiting for the heartbeat failure threshold. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced error handling for heartbeat operations, specifically addressing handshake requirements. - New boolean field `HandshakeRequired` added to track handshake necessity. - **Bug Fixes** - Improved robustness of connection health monitoring by incorporating handshake checks. - **Tests** - Added tests for new handshake handling scenarios in both `ControlPlaneTestSuite` and `ConnectionManagerTestSuite`. - Enhanced coverage for `HealthTracker` functionality regarding handshake states. - **Documentation** - Updated comments in connection health checks for clarity on new criteria. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 38572c1 commit 36c44b5

File tree

7 files changed

+198
-0
lines changed

7 files changed

+198
-0
lines changed

pkg/transport/nclprotocol/compute/controlplane.go

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package compute
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"sync"
78
"time"
89

@@ -104,6 +105,10 @@ func (cp *ControlPlane) run(ctx context.Context) {
104105

105106
case <-heartbeat.C:
106107
if err := cp.heartbeat(ctx); err != nil {
108+
if strings.Contains(err.Error(), "handshake required") {
109+
cp.healthTracker.HandshakeRequired()
110+
return
111+
}
107112
log.Error().Err(err).Msg("Failed to send heartbeat")
108113
}
109114
case <-nodeInfo.C:

pkg/transport/nclprotocol/compute/controlplane_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/bacalhau-project/bacalhau/pkg/lib/envelope"
1616
"github.com/bacalhau-project/bacalhau/pkg/lib/ncl"
1717
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
18+
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes"
1819
"github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol"
1920
nclprotocolcompute "github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol/compute"
2021
ncltest "github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol/test"
@@ -179,6 +180,72 @@ func (s *ControlPlaneTestSuite) TestHeartbeat() {
179180
}, 100*time.Millisecond, 10*time.Millisecond, "Heartbeat did not succeed")
180181
}
181182

183+
func (s *ControlPlaneTestSuite) TestHeartbeatFailFastOnHandshakeRequired() {
184+
// Create control plane with only heartbeat enabled and short intervals
185+
controlPlane := s.createControlPlane(
186+
50*time.Millisecond, // heartbeat
187+
1*time.Hour, // node info - disabled
188+
1*time.Hour, // checkpoint - disabled
189+
)
190+
defer s.Require().NoError(controlPlane.Stop(s.ctx))
191+
192+
// Setup handshake required error response
193+
s.requester.EXPECT().
194+
Request(gomock.Any(), gomock.Any()).
195+
Return(nil, nodes.NewErrHandshakeRequired("test-node")).
196+
Times(1) // Should only try once
197+
198+
// Start control plane
199+
s.Require().NoError(controlPlane.Start(s.ctx))
200+
201+
// Wait a bit to allow for heartbeat attempt
202+
time.Sleep(50 * time.Millisecond)
203+
204+
// wait health tracker state
205+
s.Require().Eventually(func() bool {
206+
return s.healthTracker.GetHealth().HandshakeRequired
207+
}, 100*time.Millisecond, 10*time.Millisecond, "Heartbeat did not succeed")
208+
209+
// Verify health tracker state
210+
health := s.healthTracker.GetHealth()
211+
s.True(health.HandshakeRequired, "handshake should be marked as required")
212+
s.Zero(health.LastSuccessfulHeartbeat, "no successful heartbeat should be recorded")
213+
214+
// Wait for another heartbeat interval to verify the loop has stopped
215+
time.Sleep(70 * time.Millisecond)
216+
217+
s.Require().Eventually(func() bool {
218+
return s.healthTracker.GetHealth().CurrentState == nclprotocol.Disconnected
219+
}, 100*time.Millisecond, 10*time.Millisecond, "connection not marked as disconnected")
220+
}
221+
222+
func (s *ControlPlaneTestSuite) TestHeartbeatContinuesOnOtherErrors() {
223+
// Create control plane with only heartbeat enabled
224+
controlPlane := s.createControlPlane(
225+
50*time.Millisecond, // heartbeat
226+
1*time.Hour, // node info - disabled
227+
1*time.Hour, // checkpoint - disabled
228+
)
229+
defer s.Require().NoError(controlPlane.Stop(s.ctx))
230+
231+
// Setup regular error response that should not cause fail-fast
232+
s.requester.EXPECT().
233+
Request(gomock.Any(), gomock.Any()).
234+
Return(nil, fmt.Errorf("network error")).
235+
Times(2) // Should keep trying
236+
237+
// Start control plane
238+
s.Require().NoError(controlPlane.Start(s.ctx))
239+
240+
// Wait for two heartbeat attempts
241+
time.Sleep(120 * time.Millisecond)
242+
243+
// Verify health tracker state
244+
health := s.healthTracker.GetHealth()
245+
s.False(health.HandshakeRequired, "handshake should not be marked as required")
246+
s.Zero(health.LastSuccessfulHeartbeat, "no successful heartbeat should be recorded")
247+
}
248+
182249
func (s *ControlPlaneTestSuite) TestNodeInfoUpdate() {
183250
// Create control plane with only checkpointing enabled
184251
controlPlane := s.createControlPlane(

pkg/transport/nclprotocol/compute/health_tracker.go

+17
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (ht *HealthTracker) MarkConnected() {
3636
ht.health.LastSuccessfulHeartbeat = ht.clock.Now()
3737
ht.health.ConsecutiveFailures = 0
3838
ht.health.LastError = nil
39+
ht.health.HandshakeRequired = false
3940
}
4041

4142
// MarkDisconnected updates status when connection is lost
@@ -46,6 +47,7 @@ func (ht *HealthTracker) MarkDisconnected(err error) {
4647
ht.health.CurrentState = nclprotocol.Disconnected
4748
ht.health.LastError = err
4849
ht.health.ConsecutiveFailures++
50+
ht.health.HandshakeRequired = false
4951
}
5052

5153
// MarkConnecting update status when connection is in progress
@@ -54,6 +56,7 @@ func (ht *HealthTracker) MarkConnecting() {
5456
defer ht.mu.Unlock()
5557

5658
ht.health.CurrentState = nclprotocol.Connecting
59+
ht.health.HandshakeRequired = false
5760
}
5861

5962
// HeartbeatSuccess records successful heartbeat
@@ -70,6 +73,13 @@ func (ht *HealthTracker) UpdateSuccess() {
7073
ht.health.LastSuccessfulUpdate = ht.clock.Now()
7174
}
7275

76+
// HandshakeRequired marks that a handshake is required
77+
func (ht *HealthTracker) HandshakeRequired() {
78+
ht.mu.Lock()
79+
defer ht.mu.Unlock()
80+
ht.health.HandshakeRequired = true
81+
}
82+
7383
// GetState returns current connection state
7484
func (ht *HealthTracker) GetState() nclprotocol.ConnectionState {
7585
ht.mu.RLock()
@@ -83,3 +93,10 @@ func (ht *HealthTracker) GetHealth() nclprotocol.ConnectionHealth {
8393
defer ht.mu.RUnlock()
8494
return ht.health
8595
}
96+
97+
// IsHandshakeRequired returns true if a handshake is required
98+
func (ht *HealthTracker) IsHandshakeRequired() bool {
99+
ht.mu.RLock()
100+
defer ht.mu.RUnlock()
101+
return ht.health.HandshakeRequired
102+
}

pkg/transport/nclprotocol/compute/health_tracker_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,13 @@ func (s *HealthTrackerTestSuite) TestInitialState() {
3939
s.Require().Equal(0, health.ConsecutiveFailures)
4040
s.Require().Nil(health.LastError)
4141
s.Require().True(health.ConnectedSince.IsZero())
42+
s.Require().False(health.HandshakeRequired) // Verify initial state of HandshakeRequired
4243
}
4344

4445
func (s *HealthTrackerTestSuite) TestMarkConnected() {
46+
// First mark handshake required
47+
s.tracker.HandshakeRequired()
48+
4549
// Advance clock to have distinct timestamps
4650
s.clock.Add(time.Second)
4751
connectedTime := s.clock.Now()
@@ -54,6 +58,7 @@ func (s *HealthTrackerTestSuite) TestMarkConnected() {
5458
s.Require().Equal(connectedTime, health.LastSuccessfulHeartbeat)
5559
s.Require().Equal(0, health.ConsecutiveFailures)
5660
s.Require().Nil(health.LastError)
61+
s.Require().False(health.HandshakeRequired) // Should be reset when connected
5762
}
5863

5964
func (s *HealthTrackerTestSuite) TestMarkDisconnected() {
@@ -68,13 +73,39 @@ func (s *HealthTrackerTestSuite) TestMarkDisconnected() {
6873
s.Require().Equal(nclprotocol.Disconnected, health.CurrentState)
6974
s.Require().Equal(expectedErr, health.LastError)
7075
s.Require().Equal(1, health.ConsecutiveFailures)
76+
s.Require().False(health.HandshakeRequired) // Should still be false after disconnect
7177

7278
// Multiple disconnections should increment failure count
7379
s.tracker.MarkDisconnected(expectedErr)
7480
health = s.tracker.GetHealth()
7581
s.Require().Equal(2, health.ConsecutiveFailures)
7682
}
7783

84+
func (s *HealthTrackerTestSuite) TestHandshakeRequired() {
85+
// Initially handshake should not be required
86+
s.Require().False(s.tracker.IsHandshakeRequired())
87+
88+
// Mark handshake as required
89+
s.tracker.HandshakeRequired()
90+
s.Require().True(s.tracker.IsHandshakeRequired())
91+
92+
// Verify it's cleared when connected
93+
s.tracker.MarkConnected()
94+
s.Require().False(s.tracker.IsHandshakeRequired())
95+
96+
// Verify it's cleared when disconnected
97+
s.tracker.HandshakeRequired()
98+
s.Require().True(s.tracker.IsHandshakeRequired())
99+
s.tracker.MarkDisconnected(fmt.Errorf("error"))
100+
s.Require().False(s.tracker.IsHandshakeRequired())
101+
102+
// Verify it's cleared when connecting
103+
s.tracker.HandshakeRequired()
104+
s.Require().True(s.tracker.IsHandshakeRequired())
105+
s.tracker.MarkConnecting()
106+
s.Require().False(s.tracker.IsHandshakeRequired())
107+
}
108+
78109
func (s *HealthTrackerTestSuite) TestSuccessfulOperations() {
79110
// Initial timestamps
80111
s.clock.Add(time.Second)

pkg/transport/nclprotocol/compute/manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ func (cm *ConnectionManager) checkConnectionHealth() {
431431
// Consider connection unhealthy if:
432432
// 1. No heartbeat succeeded within HeartbeatMissFactor intervals
433433
// 2. NATS connection is closed/draining
434+
// 3. Health tracker reports a handshake required
434435
now := cm.config.Clock.Now()
435436
heartbeatDeadline := now.Add(-time.Duration(cm.config.HeartbeatMissFactor) * cm.config.HeartbeatInterval)
436437

@@ -443,6 +444,9 @@ func (cm *ConnectionManager) checkConnectionHealth() {
443444
} else if cm.natsConn.IsClosed() {
444445
reason = "NATS connection closed"
445446
unhealthy = true
447+
} else if cm.healthTracker.IsHandshakeRequired() {
448+
reason = "handshake required"
449+
unhealthy = true
446450
}
447451

448452
if unhealthy {

pkg/transport/nclprotocol/compute/manager_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"fmt"
88
"reflect"
9+
"strings"
910
"testing"
1011
"time"
1112

@@ -18,6 +19,7 @@ import (
1819
"github.com/bacalhau-project/bacalhau/pkg/models"
1920
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
2021
natsutil "github.com/bacalhau-project/bacalhau/pkg/nats"
22+
"github.com/bacalhau-project/bacalhau/pkg/orchestrator/nodes"
2123
testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils"
2224
"github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol"
2325
nclprotocolcompute "github.com/bacalhau-project/bacalhau/pkg/transport/nclprotocol/compute"
@@ -239,6 +241,77 @@ func (s *ConnectionManagerTestSuite) TestHeartbeatFailure() {
239241
}, time.Second, 10*time.Millisecond)
240242
}
241243

244+
func (s *ConnectionManagerTestSuite) TestHeartbeatHandshakeRequired() {
245+
err := s.manager.Start(s.ctx)
246+
s.Require().NoError(err)
247+
248+
// Wait for initial connection
249+
s.Require().Eventually(func() bool {
250+
health := s.manager.GetHealth()
251+
return health.CurrentState == nclprotocol.Connected
252+
}, time.Second, 10*time.Millisecond, "manager should connect initially")
253+
254+
// Configure heartbeat to require handshake
255+
s.mockResponder.Behaviour().HeartbeatResponse.Error = nodes.NewErrHandshakeRequired("test-node")
256+
257+
// Should disconnect quickly after handshake required error
258+
s.Require().Eventually(func() bool {
259+
health := s.manager.GetHealth()
260+
return health.CurrentState == nclprotocol.Disconnected &&
261+
health.LastError != nil &&
262+
strings.Contains(health.LastError.Error(), "handshake required")
263+
}, 1*time.Second, 5*time.Millisecond, "should disconnect due to handshake required: %+v", s.manager.GetHealth())
264+
265+
// Reset heartbeat response to succeed
266+
s.mockResponder.Behaviour().HeartbeatResponse.Error = nil
267+
268+
// Should automatically attempt reconnection
269+
s.Require().Eventually(func() bool {
270+
// Get new handshakes after disconnect
271+
handshakes := s.mockResponder.GetHandshakes()
272+
return len(handshakes) > 1 // More than initial handshake
273+
}, time.Second, 10*time.Millisecond, "should attempt reconnection")
274+
275+
// Should successfully reconnect
276+
s.Require().Eventually(func() bool {
277+
health := s.manager.GetHealth()
278+
return health.CurrentState == nclprotocol.Connected &&
279+
!health.HandshakeRequired // Should be cleared after successful connection
280+
}, time.Second, 10*time.Millisecond, "should reconnect successfully")
281+
282+
// Verify heartbeats resume
283+
time.Sleep(s.config.HeartbeatInterval * 2)
284+
heartbeats := s.mockResponder.GetHeartbeats()
285+
s.Require().NotEmpty(heartbeats, "should resume heartbeats after reconnection")
286+
}
287+
288+
func (s *ConnectionManagerTestSuite) TestHeartbeatHandshakeRequiredDifferentError() {
289+
err := s.manager.Start(s.ctx)
290+
s.Require().NoError(err)
291+
292+
// Wait for initial connection
293+
s.Require().Eventually(func() bool {
294+
health := s.manager.GetHealth()
295+
return health.CurrentState == nclprotocol.Connected
296+
}, time.Second, 10*time.Millisecond)
297+
298+
// Configure heartbeat with error that mentions handshake but isn't the specific error
299+
s.mockResponder.Behaviour().HeartbeatResponse.Error = fmt.Errorf("failed to process handshake data")
300+
301+
// Wait some heartbeat intervals - should not immediately disconnect
302+
time.Sleep(s.config.HeartbeatInterval * 2)
303+
health := s.manager.GetHealth()
304+
s.False(health.HandshakeRequired, "should not set handshake required for different errors")
305+
306+
// Should eventually disconnect due to missed heartbeats
307+
time.Sleep(s.config.HeartbeatInterval * time.Duration(s.config.HeartbeatMissFactor+1))
308+
s.Eventually(func() bool {
309+
health := s.manager.GetHealth()
310+
return health.CurrentState == nclprotocol.Disconnected &&
311+
!health.HandshakeRequired // Should not be set
312+
}, time.Second, 10*time.Millisecond)
313+
}
314+
242315
func (s *ConnectionManagerTestSuite) TestNodeInfoUpdates() {
243316
// Configure heartbeat callback to trigger node info updates
244317
s.mockResponder.Behaviour().OnHeartbeat = func(req messages.HeartbeatRequest) {

pkg/transport/nclprotocol/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type ConnectionHealth struct {
5050
ConsecutiveFailures int
5151
LastError error
5252
ConnectedSince time.Time
53+
HandshakeRequired bool
5354
}
5455

5556
const (

0 commit comments

Comments
 (0)