Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions resulting in multiple data planes #4771

Merged

Conversation

wdbaruni
Copy link
Member

@wdbaruni wdbaruni commented Dec 16, 2024

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced error handling and cleanup processes for connection management.
    • State reset functionality added to the dispatcher during start and stop operations.
    • New recovery mechanisms for graceful stopping of the recovery process.
  • Bug Fixes

    • Improved error reporting during connection cleanup and state management.
  • Tests

    • Added new test methods to validate dispatcher state management and recovery processes.
    • Enhanced existing tests for better error handling and recovery logic validation.

Copy link

linear bot commented Dec 16, 2024

Copy link
Contributor

coderabbitai bot commented Dec 16, 2024

Walkthrough

This pull request introduces significant improvements to the connection and recovery management in the network communication protocol. The changes primarily focus on enhancing error handling, state management, and graceful shutdown mechanisms across multiple components of the dispatcher and connection manager. Key modifications include updating method signatures to return errors, adding state reset functionality, and implementing more robust stopping mechanisms for recovery processes.

Changes

File Change Summary
pkg/transport/nclprotocol/compute/manager.go - Updated cleanup method to return errors
- Modified Close method to handle cleanup errors
- Enhanced error propagation in connection processes
pkg/transport/nclprotocol/dispatcher/dispatcher.go - Added state reset in Start method
- Refined stop sequence in Stop method
- Improved control flow during dispatcher lifecycle
pkg/transport/nclprotocol/dispatcher/recovery.go - Added stopCh and wg to recovery struct
- Implemented new stop() method
- Enhanced recovery loop interruption mechanism
pkg/transport/nclprotocol/dispatcher/*_test.go - Added multiple new test methods for state reset and stop sequences
- Improved test coverage for recovery and dispatcher behaviors

Sequence Diagram

sequenceDiagram
    participant Dispatcher
    participant Recovery
    participant Watcher

    Dispatcher->>Dispatcher: Start()
    Dispatcher->>Dispatcher: Reset State
    Dispatcher->>Recovery: Start Recovery
    Recovery-->>Dispatcher: Recovery Initiated
    
    Dispatcher->>Dispatcher: Stop()
    Dispatcher->>Recovery: Stop Recovery
    Recovery-->>Dispatcher: Recovery Stopped
    Dispatcher->>Watcher: Stop Watcher
    Dispatcher->>Dispatcher: Cleanup
Loading

Possibly related PRs

Poem

🐰 Dispatcher's Dance, a Rabbit's Delight
Connections reset with graceful might,
Recovery loops now dance so bright,
Errors caught, no hidden plight,
Our code leaps forward with pure delight!
🚀 Hop, hop, hooray! 🎉

Tip

CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command @coderabbitai generate docstrings to have CodeRabbit automatically generate docstrings for your pull request.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
pkg/transport/nclprotocol/dispatcher/dispatcher.go (1)

226-226: Implement recovery logic for stalled messages.

The TODO comment in the checkStalledMessages method indicates a placeholder for future recovery logic implementation:

// TODO: Could implement recovery logic here

Consider implementing appropriate recovery actions when stalled messages are detected, such as retrying the publish or triggering the recovery process. This will enhance the robustness of the dispatcher in handling message delivery issues.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f9751b5 and ec1ffee.

📒 Files selected for processing (6)
  • pkg/transport/nclprotocol/compute/manager.go (5 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher.go (3 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1 hunks)
  • pkg/transport/nclprotocol/dispatcher/recovery.go (6 hunks)
  • pkg/transport/nclprotocol/dispatcher/recovery_test.go (2 hunks)
🔇 Additional comments (12)
pkg/transport/nclprotocol/dispatcher/dispatcher_e2e_test.go (1)

257-293: LGTM! Well-structured test for state reset functionality.

The test effectively validates the dispatcher's state reset behavior by:

  1. Processing an initial batch of events
  2. Stopping the dispatcher (which triggers checkpoint)
  3. Clearing received messages
  4. Starting a new dispatcher instance
  5. Verifying that only new events are processed while maintaining the checkpoint
pkg/transport/nclprotocol/compute/manager.go (4)

148-148: LGTM! Improved error propagation.

The Close method now properly propagates errors from the cleanup process.


Line range hint 158-190: LGTM! Enhanced error handling in cleanup.

The cleanup method now:

  1. Returns aggregated errors using errors.Join
  2. Provides detailed error context for each component
  3. Maintains proper cleanup order: subscriber → data plane → control plane → NATS connection

208-218: LGTM! Improved connection management.

Good improvements to the connect method:

  1. Cleanup of existing components before reconnecting
  2. Proper error handling in the defer function
  3. Logging of cleanup errors after connection failures

Line range hint 158-218: Effective fix for race conditions.

The changes effectively prevent multiple data planes by:

  1. Ensuring complete cleanup before reconnection attempts
  2. Sequential component cleanup (subscriber → data plane → control plane → NATS)
  3. Proper error handling and propagation
  4. Improved state management during transitions

This systematic approach prevents race conditions that could lead to multiple data plane instances.

pkg/transport/nclprotocol/dispatcher/recovery_test.go (3)

143-151: Verify the recovery sequence in TestRecoveryLoopWithRunningWatcher.

In the TestRecoveryLoopWithRunningWatcher method, confirm that the mocked expectations accurately represent the scenario where the watcher is already running. Ensure that the Stats() method returns the watcher.StateRunning state as intended, and check that the recovery logic handles this state correctly.

Also applies to: 153-159


170-199: Handle context cancellation correctly in TestRecoveryLoopWithContextCancellation.

In the TestRecoveryLoopWithContextCancellation method, verify that cancelling the context leads to the expected termination of the recovery loop. Ensure that all resources are properly released and that there are no goroutine leaks upon context cancellation.


220-262: Validate new tests for stopping recovery mechanisms.

The new tests TestStopDuringRecovery, TestStopAfterRecoveryComplete, TestMultipleStopCalls, and TestStopAndReset effectively verify the stopping behavior of the recovery process. Ensure that:

  • The stop method correctly interrupts the recovery loop.
  • Multiple calls to stop are idempotent and do not cause unintended side effects.
  • The recovery state is properly reset after stopping, allowing for subsequent recovery operations.
pkg/transport/nclprotocol/dispatcher/dispatcher_test.go (1)

175-196: Confirm state reset behavior upon restart in TestStartResetsState.

In the TestStartResetsState method, verify that the dispatcher correctly resets its internal state when started after a stop. Ensure that:

  • All internal counters and flags are reinitialized.
  • The watcher starts without residual state from the previous run.
  • No errors occur during the restart process.
pkg/transport/nclprotocol/dispatcher/dispatcher.go (3)

95-99: Initialize dispatcher state before starting.

In the Start method, the addition of:

// Reset state before starting
d.state.reset()
d.recovery.reset()

ensures that the dispatcher and recovery state are properly initialized before processing begins. This is essential to prevent residual state from previous runs affecting the current operation.


129-131: Validate recovery stop during dispatcher shutdown.

In the Stop method, the call to d.recovery.stop() should ensure that the recovery process is gracefully terminated before proceeding with the shutdown sequence. Confirm that this call effectively stops any ongoing recovery activities and that there are no goroutine leaks.


132-135: Ensure proper shutdown of background goroutines.

Closing the stopCh channel signals the background goroutines to terminate. Verify that all goroutines listening to stopCh handle this signal appropriately and exit cleanly. This helps prevent resource leaks and ensures a clean shutdown.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
pkg/transport/nclprotocol/dispatcher/recovery.go (1)

Line range hint 92-134: Timer cleanup needed in error handling path.

While the interruptible timer implementation is good, ensure the timer is cleaned up in all code paths to prevent potential resource leaks.

 		if err := r.watcher.Start(ctx); err != nil {
+			timer.Stop()
 			if r.watcher.Stats().State == watcher.StateRunning {
 				log.Debug().Msg("Watcher already after recovery. Exiting recovery loop.")
 				return
 			}
🧹 Nitpick comments (2)
pkg/transport/nclprotocol/dispatcher/recovery.go (2)

40-40: Consider using make with explicit size for clarity.

While functionally correct, consider being explicit about the channel size for better readability:

-		stopCh:    make(chan struct{}),
+		stopCh:    make(chan struct{}, 0), // unbuffered channel

140-147: Consider additional synchronization for stopCh reinitialization.

While calling stop() first helps, there's still a small window where a new recovery loop could start using the old stopCh. Consider moving the stopCh reinitialization inside the mutex lock:

 func (r *recovery) reset() {
 	r.stop() // Stop any existing recovery first
-
 	r.mu.Lock()
 	defer r.mu.Unlock()
 	r.isRecovering = false
 	r.lastFailure = time.Time{}
 	r.failures = 0
-	r.stopCh = make(chan struct{})
+	// Reinitialize stopCh while holding the lock
+	select {
+	case <-r.stopCh: // drain old channel if not already drained
+	default:
+	}
+	r.stopCh = make(chan struct{})
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec1ffee and 6f3693b.

📒 Files selected for processing (1)
  • pkg/transport/nclprotocol/dispatcher/recovery.go (5 hunks)
🔇 Additional comments (3)
pkg/transport/nclprotocol/dispatcher/recovery.go (3)

29-30: LGTM! Good use of Go concurrency patterns.

The addition of stopCh and wg follows Go best practices for managing goroutine lifecycle and graceful shutdown.


86-86: LGTM! Proper WaitGroup usage.

Correctly incrementing the WaitGroup counter before launching the goroutine.


159-171: LGTM! Well-implemented shutdown mechanism.

The stop method correctly implements safe channel closing and proper goroutine cleanup:

  • Mutex protection for channel operations
  • Safe handling of multiple close attempts
  • Proper wait for goroutine completion

@wdbaruni wdbaruni merged commit 38572c1 into main Dec 16, 2024
14 checks passed
@wdbaruni wdbaruni deleted the eng-450-fix-race-conditions-resulting-in-multiple-compute-data branch December 16, 2024 09:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant