Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve watcher creation pattern and manager naming #4713

Merged
merged 9 commits into from
Nov 21, 2024
Merged

Conversation

wdbaruni
Copy link
Member

@wdbaruni wdbaruni commented Nov 21, 2024

This PR introduces several important changes to the watcher package to unblock changes related to NCL protocol:

  1. Introduce two-step watcher creation pattern:

    • Split watcher creation and handler configuration for better control
    • Enables explicit handler setup before starting
    • This pattern is needed to create an NCL dispatcher that manages the watcher's handler, and restart the watcher when needed
    • Add validation to prevent starting without handler
    • Improve error handling for invalid states
  2. Fix watcher restart behavior after checkpoints:

    • Ensure proper state restoration when restarting
    • Maintain correct sequence position after checkpoint, and restart from the latest checkpoint and not latest processed event
    • Add tests to verify checkpoint/restart behavior
  3. Rename Registry to Manager:

    • More accurately represents its role in managing watcher lifecycles
    • Ensures consistent terminology throughout the codebase
  4. Additional improvements:

    • Add documentation for new creation pattern
    • Update tests to cover new scenarios
    • Improve error messages and validation

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a new Manager type to manage multiple watchers, replacing the previous Registry terminology.
    • Enhanced watcher creation with new configuration options for automatic starting and event handler assignment.
    • Added error handling for scenarios where no handler is set or a duplicate watcher ID is used.
  • Documentation

    • Updated README and usage examples to reflect the new Manager terminology and improved clarity on functionality.
  • Bug Fixes

    • Improved error handling related to event processing and watcher lifecycle management.
  • Tests

    • Implemented a comprehensive test suite for the Manager functionality, ensuring robust coverage of various scenarios.

Copy link
Contributor

coderabbitai bot commented Nov 21, 2024

Note

Currently processing new changes in this PR. This may take a few minutes, please wait...

📥 Commits

Reviewing files that changed from the base of the PR and between b9ffe3d and f7856e3.

📒 Files selected for processing (3)
  • pkg/lib/watcher/manager.go (1 hunks)
  • pkg/lib/watcher/types.go (3 hunks)
  • pkg/lib/watcher/watcher_test.go (27 hunks)
 __________________________________________
< Performing energy-intensive code review. >
 ------------------------------------------
  \
   \   (\__/)
       (•ㅅ•)
       /   づ

Walkthrough

The pull request introduces significant changes to the Watcher Library, primarily renaming the "Registry" component to "Manager" across various files. This includes updates to method signatures, documentation, and the introduction of new error handling capabilities. Additionally, new configuration options for creating watchers are added, enhancing flexibility. The changes also involve the addition of a new manager type to manage multiple watchers, along with comprehensive test coverage for the new functionality. Several files related to the old registry implementation have been removed, reflecting a complete transition to the new manager-based architecture.

Changes

File Change Summary
pkg/lib/watcher/README.md Updated terminology from "Registry" to "Manager"; modified method signatures and usage examples; added new configuration options for watchers.
pkg/lib/watcher/errors.go Added new error variables: ErrNoHandler and ErrHandlerExists.
pkg/lib/watcher/manager.go Introduced manager type; added methods for creating, looking up, and stopping watchers; included lifecycle management functionality.
pkg/lib/watcher/manager_test.go Added a comprehensive test suite for the manager functionality, covering various scenarios.
pkg/lib/watcher/mocks.go Updated mock types; renamed methods to align with the new Manager interface.
pkg/lib/watcher/options.go Enhanced watchOptions struct with new fields and functions for handler and auto-start configurations.
pkg/lib/watcher/registry.go Removed file; previously managed watchers using the Registry type.
pkg/lib/watcher/registry_test.go Removed file; contained tests for the old Registry functionality.
pkg/lib/watcher/serializer.go Updated comments to reflect "Manager" instead of "Registry".
pkg/lib/watcher/types.go Expanded Watcher interface; replaced Registry interface with Manager interface.
pkg/lib/watcher/watcher.go Modified watcher struct; updated methods for improved functionality and error handling.
pkg/lib/watcher/watcher_test.go Refactored test suite to improve context management and error handling.
pkg/node/compute.go Updated Compute struct to use watcher.Manager; modified watcher setup logic.
pkg/node/requester.go Changed watcher management from Registry to Manager; refined watcher initialization logic.
pkg/orchestrator/evaluation/watcher_test.go Updated test suite to use watcher.Manager instead of watcher.Registry.

Possibly related PRs

  • feat: adds agent config command that returns the agent configuration #4671: The changes in the main PR regarding the renaming of "Registry" to "Manager" and the introduction of new methods for managing watchers are directly related to the updates in the pkg/node/compute.go and pkg/node/requester.go files, which also reflect the shift from using a Registry to a Manager for handling watchers.

Suggested reviewers

  • jamlo

🐇 In the world of watchers, new terms we embrace,
From "Registry" to "Manager," we quicken our pace.
With handlers and options, our watchers now thrive,
In this vibrant new structure, our code comes alive!
So hop along, friends, let’s celebrate this cheer,
For clarity and structure, the future is clear! 🐇✨


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🧹 Outside diff range and nitpick comments (21)
pkg/lib/watcher/options.go (3)

24-24: Add documentation for new fields

Consider adding descriptive comments for the new fields to maintain consistency with other fields in the struct:

-	handler              EventHandler  // event handler
+	handler              EventHandler  // handles processing of watched events
-	autoStart            bool
+	autoStart            bool          // controls whether watcher starts immediately after creation

Also applies to: 31-31


36-47: Consider additional handler validation

The validation ensures handler is set when autoStart is true, which is good. However, consider adding validation to prevent setting handler multiple times, as mentioned in the PR objectives about improving error handling for invalid states.

 func (o *watchOptions) validate() error {
 	err := errors.Join(
 		validate.IsGreaterThanZero(o.batchSize, "batchSize must be greater than zero"),
 		validate.IsGreaterOrEqualToZero(o.initialBackoff, "initialBackoff cannot be negative"),
 		validate.IsGreaterOrEqualToZero(o.maxBackoff, "maxBackoff cannot be negative"),
 		validate.IsGreaterOrEqualToZero(o.maxRetries, "maxRetries cannot be negative"),
 		validate.IsGreaterOrEqual(o.maxBackoff, o.initialBackoff, "maxBackoff must be greater than or equal to initialBackoff"))

 	// validate handler is set if autoStart is enabled
 	if o.autoStart && o.handler == nil {
 		err = errors.Join(err, errors.New("handler must be set when autoStart is enabled"))
 	}
+	// validate handler hasn't been set multiple times
+	if o.handler != nil && previousHandler != nil {
+		err = errors.Join(err, errors.New("handler has already been set"))
+	}
 	return err
 }

76-81: Add parameter validation

Consider adding validation for the handler parameter to prevent nil handlers from being set.

 func WithHandler(handler EventHandler) WatchOption {
+	if handler == nil {
+		panic("handler cannot be nil")
+	}
 	return func(o *watchOptions) {
 		o.handler = handler
 	}
 }
pkg/orchestrator/evaluation/watcher_test.go (2)

Line range hint 44-50: Consider enhancing error handling test coverage.

The new watcher creation pattern with options is well implemented. However, consider adding test cases for:

  • Watcher creation without a handler
  • Invalid filter configurations
  • Failed start scenarios

Example test case structure:

func (s *WatchHandlerTestSuite) TestWatcherCreationFailures() {
    // Test: Create without handler
    w, err := s.registry.Create(s.ctx, "test-watcher-no-handler")
    s.Require().Error(err)
    
    // Test: Invalid filter
    w, err = s.registry.Create(s.ctx, "test-watcher-invalid-filter",
        watcher.WithHandler(s.watchHandler),
        watcher.WithFilter(watcher.EventFilter{
            ObjectTypes: []string{}, // empty types
        }),
    )
    s.Require().Error(err)
}

Line range hint 29-59: Add test coverage for checkpoint and restart behavior.

The test suite is well-structured but missing coverage for the checkpoint-related changes mentioned in the PR objectives. Consider adding test cases to verify:

  • Watcher restart after checkpoint
  • State restoration post-checkpoint
  • Correct sequence position maintenance

Example test structure:

func (s *WatchHandlerTestSuite) TestWatcherCheckpointRestart() {
    // Create and process some events
    // Perform checkpoint
    // Stop and restart watcher
    // Verify correct sequence position
    // Verify no duplicate processing
}
pkg/lib/watcher/README.md (2)

110-114: Consider adding a note about use cases for each creation pattern.

While both creation patterns are well documented, it would be helpful to explain when to use each approach:

  • Two-step pattern: When you need fine-grained control over the setup process
  • One-step pattern: When immediate start is desired and all configuration is known upfront

73-73: Fix ordered list numbering.

The markdown linter identified inconsistencies in ordered list numbering. The list items should use consistent numbering style.

Apply these changes:

-73. Create a manager:
+2. Create a manager:

-89. Create a watcher and set handler:
+4. Create a watcher and set handler:

Also applies to: 89-89

🧰 Tools
🪛 Markdownlint

73-73: Expected: 1; Actual: 2; Style: 1/1/1
Ordered list item prefix

(MD029, ol-prefix)

pkg/lib/watcher/manager_test.go (3)

1-2: Add a descriptive comment for the build tag

Consider adding a comment explaining the purpose of the build tag to improve maintainability.

-//go:build unit || !integration
+// Package watcher_test contains unit tests for the watcher package.
+// These tests run as part of the unit test suite or when integration tests are disabled.
+//go:build unit || !integration

167-193: Add test cases for error scenarios and concurrent operations

The test suite would benefit from additional test cases:

  1. Handler error scenarios
  2. Concurrent watcher operations
  3. Edge cases for event processing

Example test case for handler errors:

func (s *ManagerTestSuite) TestWatcherHandlerError() {
    ctx := context.Background()
    watcherID := "test-watcher"

    event := watcher.StoreEventRequest{
        Operation: watcher.OperationCreate,
        ObjectType: "TestObject",
        Object: "test1",
    }
    s.Require().NoError(s.mockStore.StoreEvent(ctx, event))

    expectedErr := errors.New("handler error")
    s.mockHandler.EXPECT().
        HandleEvent(gomock.Any(), gomock.Any()).
        Return(expectedErr)

    w, err := s.manager.Create(ctx, watcherID)
    s.Require().NoError(err)
    s.Require().NoError(w.SetHandler(s.mockHandler))
    s.startAndWait(ctx, w)

    s.Require().Eventually(func() bool {
        return w.Stats().Errors > 0
    }, 200*time.Millisecond, 10*time.Millisecond)
}

250-257: Enhance startAndWait helper method

Consider making the helper method more reusable by accepting a desired state parameter and timeout duration.

-func (s *ManagerTestSuite) startAndWait(ctx context.Context, w watcher.Watcher) {
+func (s *ManagerTestSuite) waitForWatcherState(
+    ctx context.Context,
+    w watcher.Watcher,
+    desiredState watcher.State,
+    timeout time.Duration,
+) {
     s.Require().NoError(w.Start(ctx))
 
-    // Ensure the watcher is running
     s.Require().Eventually(func() bool {
-        return w.Stats().State == watcher.StateRunning
-    }, time.Second, 10*time.Millisecond)
+        return w.Stats().State == desiredState
+    }, timeout, 10*time.Millisecond)
 }
pkg/node/compute.go (1)

392-394: Consider extracting common watcher options

The watcher creation pattern looks good, but there's some duplication in the options. Consider extracting common options to reduce duplication:

func setupComputeWatchers(...) (watcher.Manager, error) {
    watcherRegistry := watcher.NewManager(executionStore.GetEventStore())
+   
+   commonOpts := []watcher.Option{
+       watcher.WithAutoStart(),
+       watcher.WithInitialEventIterator(watcher.LatestIterator()),
+   }
+   
+   dispatcherOpts := append(commonOpts,
+       watcher.WithFilter(watcher.EventFilter{
+           ObjectTypes: []string{compute.EventObjectExecutionUpsert},
+       }),
+       watcher.WithRetryStrategy(watcher.RetryStrategySkip),
+       watcher.WithMaxRetries(3),
+   )

    // Execution logger
-   _, err := watcherRegistry.Create(ctx, computeExecutionLoggerWatcherID,
-       watcher.WithHandler(watchers.NewExecutionLogger(log.Logger)),
-       watcher.WithAutoStart(),
-       watcher.WithInitialEventIterator(watcher.LatestIterator()))
+   _, err := watcherRegistry.Create(ctx, computeExecutionLoggerWatcherID,
+       watcher.WithHandler(watchers.NewExecutionLogger(log.Logger)),
+       commonOpts...)

    // ... dispatcher setup ...

-   _, err = watcherRegistry.Create(ctx, computeToOrchestratorDispatcherWatcherID,
-       watcher.WithHandler(dispatcher),
-       watcher.WithAutoStart(),
-       watcher.WithFilter(watcher.EventFilter{...}),
-       watcher.WithRetryStrategy(watcher.RetryStrategySkip),
-       watcher.WithMaxRetries(3),
-       watcher.WithInitialEventIterator(watcher.LatestIterator()))
+   _, err = watcherRegistry.Create(ctx, computeToOrchestratorDispatcherWatcherID,
+       watcher.WithHandler(dispatcher),
+       dispatcherOpts...)

    // ... execution handler setup ...

-   _, err = watcherRegistry.Create(ctx, computeExecutionHandlerWatcherID,
-       watcher.WithHandler(executionHandler),
-       watcher.WithAutoStart(),
-       watcher.WithFilter(watcher.EventFilter{...}),
-       watcher.WithRetryStrategy(watcher.RetryStrategySkip),
-       watcher.WithMaxRetries(3),
-       watcher.WithInitialEventIterator(watcher.LatestIterator()))
+   _, err = watcherRegistry.Create(ctx, computeExecutionHandlerWatcherID,
+       watcher.WithHandler(executionHandler),
+       dispatcherOpts...)

Also applies to: 409-411, 424-426

pkg/lib/watcher/manager.go (1)

100-103: Consider collecting errors from watcher shutdown

Currently, any errors returned by w.Stop(timeoutCtx) in the goroutines are not handled. To improve error handling and observability, consider collecting these errors and logging or aggregating them. This ensures that any issues during watcher shutdown are captured and can be addressed.

pkg/lib/watcher/types.go (3)

22-22: Clarify the purpose of CheckpointIterator in Stats

The CheckpointIterator field seems to mirror NextEventIterator in the Stats struct. Consider elaborating on how CheckpointIterator is used differently from NextEventIterator to enhance understanding and maintainability.


59-60: Document side effects when using SeekToOffset during runtime

Calling SeekToOffset will stop and restart the watcher if it's running. This may interrupt event processing and could lead to missed or duplicated events. Consider adding details about these side effects in the documentation and ensure that the implementation handles these scenarios gracefully.


80-80: Align error handling for Stop methods

The Stop method in the Manager interface returns an error, whereas in the Watcher interface, Stop does not return an error. For consistency and to handle potential shutdown errors, consider having the Watcher.Stop method return an error.

pkg/lib/watcher/mocks.go (1)

213-216: Inconsistent method naming: LookupWatcher vs. Lookup

The method Lookup is correctly implemented, but the comment refers to LookupWatcher. Align the comment with the method name.

Apply this diff to correct the comment:

-// LookupWatcher mocks base method.
+// Lookup mocks base method.

 func (m *MockRegistry) Lookup(watcherID string) (Watcher, error) {
     // Method implementation
 }
pkg/node/requester.go (2)

484-484: Implement checkpointing to prevent missed events

There's a TODO comment indicating that checkpointing needs to be added to prevent events from being missed:

    // TODO: Add checkpointing or else events will be missed

Implementing checkpointing is important to ensure reliable event processing and to avoid missing critical events during restarts or failures.

Would you like assistance in designing and implementing the checkpointing mechanism, or should I open a new GitHub issue to track this task?


486-486: Redundant WithMaxRetries when using RetryStrategySkip

In the watcher creation on line 486, watcher.WithRetryStrategy(watcher.RetryStrategySkip) is used along with watcher.WithMaxRetries(3). Since the retry strategy is set to skip retries, specifying a maximum number of retries is redundant. Consider removing WithMaxRetries to simplify the code.

Apply this diff to remove the redundant parameter:

    		watcher.WithHandler(dispatcher),
    		watcher.WithAutoStart(),
    		watcher.WithFilter(watcher.EventFilter{
    			ObjectTypes: []string{jobstore.EventObjectExecutionUpsert},
    		}),
    		watcher.WithRetryStrategy(watcher.RetryStrategySkip),
-    		watcher.WithMaxRetries(3),
    		watcher.WithInitialEventIterator(watcher.LatestIterator()))
pkg/lib/watcher/watcher_test.go (3)

42-42: Consider parameterizing the context timeout

The fixed 5-second timeout in SetupTest may not be sufficient for all test environments, especially under heavy load or slower systems. Consider parameterizing the timeout duration or using a longer timeout to avoid unintended test failures.


933-939: Ensure goroutine safety when interacting with channels

In TestStopStates, when setting up the getEventsCh channel and the WithGetEventsInterceptor, ensure that channel operations are properly synchronized to avoid potential deadlocks or race conditions.


Line range hint 178-188: Handle potential errors when storing events in tests

While storing events in the loop, consider handling potential errors within the loop to prevent the test from continuing in an invalid state if an error occurs.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between f40183b and 2306540.

📒 Files selected for processing (15)
  • pkg/lib/watcher/README.md (6 hunks)
  • pkg/lib/watcher/errors.go (1 hunks)
  • pkg/lib/watcher/manager.go (1 hunks)
  • pkg/lib/watcher/manager_test.go (1 hunks)
  • pkg/lib/watcher/mocks.go (5 hunks)
  • pkg/lib/watcher/options.go (2 hunks)
  • pkg/lib/watcher/registry.go (0 hunks)
  • pkg/lib/watcher/registry_test.go (0 hunks)
  • pkg/lib/watcher/serializer.go (2 hunks)
  • pkg/lib/watcher/types.go (3 hunks)
  • pkg/lib/watcher/watcher.go (8 hunks)
  • pkg/lib/watcher/watcher_test.go (27 hunks)
  • pkg/node/compute.go (4 hunks)
  • pkg/node/requester.go (3 hunks)
  • pkg/orchestrator/evaluation/watcher_test.go (2 hunks)
💤 Files with no reviewable changes (2)
  • pkg/lib/watcher/registry.go
  • pkg/lib/watcher/registry_test.go
✅ Files skipped from review due to trivial changes (1)
  • pkg/lib/watcher/serializer.go
🧰 Additional context used
🪛 Markdownlint
pkg/lib/watcher/README.md

73-73: Expected: 1; Actual: 2; Style: 1/1/1
Ordered list item prefix

(MD029, ol-prefix)


89-89: Expected: 1; Actual: 4; Style: 1/1/1
Ordered list item prefix

(MD029, ol-prefix)

🪛 LanguageTool
pkg/lib/watcher/README.md

[uncategorized] ~135-~135: Loose punctuation mark.
Context: ...d. - WithHandler(handler EventHandler): Sets the event handler for the watcher....

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~136-~136: Loose punctuation mark.
Context: ...ler for the watcher. - WithAutoStart(): Enables automatic start of the watcher ...

(UNLIKELY_OPENING_PUNCTUATION)

🔇 Additional comments (32)
pkg/lib/watcher/errors.go (1)

12-13: LGTM! The new error definitions align well with the two-step watcher creation pattern.

The error messages are clear and follow the existing conventions. They support the validation requirements mentioned in the PR objectives.

Let's verify the usage of these new errors in the codebase:

✅ Verification successful

Let me gather more context about the error handling implementation in the watcher code to ensure proper validation.


The new error definitions are properly implemented and used in the codebase

The verification confirms:

  • ErrHandlerExists is used to prevent duplicate handler registration in SetHandler
  • ErrNoHandler validates the handler presence before watcher starts
  • Both errors have corresponding test coverage
  • The implementation follows proper mutex locking patterns and error wrapping
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that the new errors are properly used in the manager implementation
# Expected: Usage in handler configuration validation

# Check for ErrNoHandler usage
echo "Checking ErrNoHandler usage:"
rg "ErrNoHandler" --type go

# Check for ErrHandlerExists usage
echo "Checking ErrHandlerExists usage:"
rg "ErrHandlerExists" --type go

# Look for related error handling patterns
echo "Checking handler-related error handling:"
ast-grep --pattern 'if $_ != nil {
  return $_, ErrNoHandler
}'

Length of output: 734


Script:

#!/bin/bash
# Check the implementation context of error handling in watcher.go
echo "Checking watcher.go implementation:"
rg -B 3 -A 3 "ErrNoHandler|ErrHandlerExists" pkg/lib/watcher/watcher.go

# Check test coverage for these error conditions
echo "Checking test coverage:"
rg -B 3 -A 3 "ErrNoHandler|ErrHandlerExists" pkg/lib/watcher/watcher_test.go

# Look for handler-related validation checks
echo "Checking handler validation patterns:"
ast-grep --pattern 'if $_.handler == nil {
  $$$
}'

Length of output: 1053

pkg/lib/watcher/options.go (1)

62-67: LGTM!

The WithAutoStart option is well-implemented and follows the established pattern for option functions.

pkg/orchestrator/evaluation/watcher_test.go (1)

25-25: LGTM: Registry to Manager rename is properly implemented.

The type change from watcher.Registry to watcher.Manager and corresponding initialization update are consistent with the PR objectives.

Also applies to: 40-40

pkg/lib/watcher/README.md (3)

23-23: LGTM: Component renaming is clear and consistent.

The renaming from "Registry" to "Manager" is well documented and accurately reflects the component's role in managing watcher lifecycles.

Also applies to: 45-47


89-108: Excellent documentation of the two-step watcher creation pattern.

The documentation clearly explains the new two-step creation pattern, which aligns with the PR's objective of separating watcher creation from handler configuration. This pattern provides better control over the setup process.

🧰 Tools
🪛 Markdownlint

89-89: Expected: 1; Actual: 4; Style: 1/1/1
Ordered list item prefix

(MD029, ol-prefix)


135-136: LGTM: New configuration options are well documented.

The new configuration options WithHandler and WithAutoStart are clearly documented with their purposes.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~135-~135: Loose punctuation mark.
Context: ...d. - WithHandler(handler EventHandler): Sets the event handler for the watcher....

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~136-~136: Loose punctuation mark.
Context: ...ler for the watcher. - WithAutoStart(): Enables automatic start of the watcher ...

(UNLIKELY_OPENING_PUNCTUATION)

pkg/node/compute.go (2)

388-389: LGTM! Return type change aligns with new watcher management pattern

The return type change from Registry to Manager is consistent with the PR objectives of improving the watcher creation pattern.


47-47: Verify external usage of the Watchers field

The type change from Registry to Manager is a breaking change that could affect external code accessing this field.

✅ Verification successful

Type change from Registry to Manager appears safe

The change of the Watchers field type from Registry to Manager appears to be an internal refactoring with no external impact. Extensive searches through the codebase show:

  1. No direct access to the Watchers field from outside the package
  2. The Compute struct is only used in test files and internal implementations
  3. The field is not exposed in any public APIs or configurations
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for external usage of the Watchers field
rg -g '!pkg/node/compute.go' "Compute.*Watchers" 

Length of output: 50


Script:

#!/bin/bash
# Let's try a broader search for any usage of the Compute struct
rg "type.*Compute.*struct" -A 5

# Also search for any direct usage of Compute
rg "(\*Compute|Compute\.) " -A 2

Length of output: 7581


Script:

#!/bin/bash
# Let's search for any direct access to the Watchers field or any watcher-related method calls
rg "(\*Compute).*\.Watchers" -A 2

# Also search for any method that might be using the Watchers field
ast-grep --pattern 'func \($_\) \*Compute\) $_($$$) {
  $$$
  $_.Watchers
  $$$
}'

Length of output: 120

pkg/lib/watcher/watcher.go (9)

35-36: Introducing the New constructor function enhances clarity and accessibility

The addition of the New constructor function, transitioning from the unexported newWatcher, makes the watcher creation more accessible to external packages. This change improves the API's usability while maintaining encapsulation.


65-70: Flexible handler configuration during watcher initialization

By conditionally setting the handler if provided in the options, the code allows for greater flexibility. This enables users to configure the watcher with or without an initial handler, enhancing modularity.


125-140: Addition of SetHandler method for dynamic handler assignment

The new SetHandler method allows setting or updating the event handler post-initialization. This promotes modularity and reusability. Proper synchronization with mutex locks and error handling ensures thread safety and prevents accidental handler overwrites.


143-154: Enhanced error handling in Start method for watcher state and handler

The updated Start method now includes checks for the watcher's state and verifies that a handler is set before starting. Returning appropriate errors when the watcher is in an invalid state or lacks a handler prevents runtime issues.


155-158: Proper initialization of context and channels in Start

Creating a new cancelable context and reinitializing the stopped channel ensures that each watcher start has a fresh context and synchronization mechanism. This is crucial for correct start-stop cycles.


170-176: Implementation of the run method encapsulates the event loop effectively

Extracting the main event processing loop into the run method improves code organization and readability. The deferred function ensures that the watcher’s state is updated, and resources are cleaned up when the loop exits.


283-288: Safe and idempotent Stop method implementation

The Stop method now safely handles multiple stop calls by checking the watcher's state and avoids redundant operations. This prevents potential panics from channel operations and ensures graceful shutdowns.


307-318: Updating checkpointIterator after checkpointing enhances consistency

By updating the checkpointIterator upon successful checkpoint storage, the watcher ensures that it resumes from the correct position after restarts. Mutex locks prevent race conditions during this update.


323-336: Improved SeekToOffset with restart logic ensures correct watcher state

The SeekToOffset method now stops the watcher, updates the checkpoint, and restarts it. This sequence ensures that the watcher applies the offset correctly and resumes operation without inconsistencies.

pkg/lib/watcher/mocks.go (8)

79-85: SetHandler method successfully added to MockWatcher

The SetHandler method has been correctly implemented in MockWatcher, matching the interface requirements.


87-91: SetHandler recorder method correctly implemented

The recorder for the SetHandler method in MockWatcherMockRecorder has been properly added, ensuring accurate recording of expected calls.


93-99: Start method successfully added to MockWatcher

The Start method implementation in MockWatcher aligns with the expected interface and is correctly mocked.


102-105: Start recorder method correctly implemented

The recorder for the Start method in MockWatcherMockRecorder is properly added, facilitating accurate expectation setting.


295-297: Parameter renaming in GetEvents improves clarity

The GetEvents method now uses request instead of params as a parameter name, enhancing readability and consistency.


304-306: Recorder method updated with renamed parameter

The recorder method for GetEvents correctly reflects the parameter name change to request, maintaining consistency.


339-341: Parameter renaming in StoreEvent enhances consistency

The StoreEvent method now uses request instead of event, which aligns with the method's purpose and improves clarity.


347-349: Recorder method updated for StoreEvent

The recorder method for StoreEvent correctly uses the updated parameter name request, ensuring consistency.

pkg/node/requester.go (3)

434-435: Update return type to watcher.Manager

The function setupOrchestratorWatchers now returns watcher.Manager instead of watcher.Registry, reflecting the new pattern for watcher management. This change is appropriate and aligns with the updates in the watcher package.


438-440: Correct initialization of the evaluation watcher

The evaluation watcher is correctly created using the Create method with the handler and WithAutoStart. The use of watcher.LatestIterator() ensures it starts from the latest events. This implementation conforms to the new watcher creation pattern.


452-454: Proper setup of the execution logger watcher

The execution logger watcher is appropriately created with the correct handler and is set to auto-start. The initialization parameters are correctly specified.

pkg/lib/watcher/watcher_test.go (4)

24-25: Enhancement: Add context and cancel function to WatcherTestSuite

Adding ctx and cancel fields to WatcherTestSuite enables better control over test execution and resource management, ensuring that contexts are properly managed throughout the tests.


42-42: Initialize test context with timeout in SetupTest

Initializing s.ctx with a timeout of 5 seconds in SetupTest prevents tests from hanging indefinitely and ensures timely cancellation of operations.


50-50: Ensure proper cancellation of context in TearDownTest

Calling s.cancel() in TearDownTest ensures that all resources are properly released after each test, preventing potential leaks or unintended side effects.


Line range hint 54-77: Consistent use of test context across all test cases

Updating all test cases to use s.ctx ensures that each test operates within the controlled context, adhering to the timeout and cancellation policies defined in SetupTest. This enhances consistency and prevents tests from interfering with each other.

Also applies to: 79-93, 94-99, 204-219, 240-245, 256-270, 287-301, 312-324, 342-350, 370-387, 398-422, 486-508, 539-547, 561-604, 648-652, 667-683, 707-726, 780-792, 805-820, 826-856, 861-893, 896-969, 973-994

pkg/lib/watcher/README.md Show resolved Hide resolved
pkg/lib/watcher/manager_test.go Show resolved Hide resolved
pkg/lib/watcher/manager_test.go Outdated Show resolved Hide resolved
pkg/lib/watcher/manager_test.go Outdated Show resolved Hide resolved
pkg/lib/watcher/types.go Show resolved Hide resolved
pkg/lib/watcher/types.go Outdated Show resolved Hide resolved
pkg/lib/watcher/mocks.go Outdated Show resolved Hide resolved
pkg/lib/watcher/mocks.go Outdated Show resolved Hide resolved
pkg/lib/watcher/mocks.go Outdated Show resolved Hide resolved
pkg/lib/watcher/mocks.go Outdated Show resolved Hide resolved
wdbaruni and others added 3 commits November 21, 2024 10:12
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
pkg/lib/watcher/manager_test.go (2)

154-158: Clarify the comment or adjust the code to reflect waiting behavior

The comment // sleep and verify that the watcher is still stopping suggests a sleep, but the code uses Eventually to verify the state remains StateStopping. To improve readability and maintainability, consider either updating the comment to accurately reflect the use of Eventually or introducing a brief time.Sleep to match the comment.

Option 1: Update the comment to reflect the current code:

-	// sleep and verify that the watcher is still stopping
+	// Ensure the watcher remains in the stopping state

Option 2: Use time.Sleep to align with the original comment:

-	s.Require().Eventually(func() bool {
-		return w.Stats().State == watcher.StateStopping
-	}, 200*time.Millisecond, 10*time.Millisecond)
+	time.Sleep(100 * time.Millisecond)
+	s.Require().Equal(watcher.StateStopping, w.Stats().State)

230-231: Use Require instead of Equal for consistency

Throughout the tests, s.Require() is used for assertions to immediately fail the test upon a failed condition. In line 230, s.Equal() is used, which continues execution even if the assertion fails. For consistency and to prevent cascading failures, consider using s.Require().Equal().

Apply this diff to use Require:

-	s.Equal(watcher.StateRunning, w2.Stats().State)
+	s.Require().Equal(watcher.StateRunning, w2.Stats().State)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2306540 and c8735e1.

📒 Files selected for processing (1)
  • pkg/lib/watcher/manager_test.go (1 hunks)
🔇 Additional comments (1)
pkg/lib/watcher/manager_test.go (1)

36-39: ⚠️ Potential issue

Fix potential race condition in TearDownTest

Finalizing the mock controller before stopping the manager could lead to race conditions if the manager is still interacting with the mocks during shutdown. Swap the order to ensure the manager stops before the mocks are finished.

Apply this diff to fix the potential race condition:

 func (s *ManagerTestSuite) TearDownTest() {
-	s.ctrl.Finish()
 	s.Require().NoError(s.manager.Stop(context.Background()), "failed to stop manager in teardown")
+	s.ctrl.Finish()
 }

Likely invalid or redundant comment.

pkg/lib/watcher/manager_test.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (6)
pkg/lib/watcher/types.go (1)

71-81: Enhance Manager interface documentation

Please enhance the interface documentation to include:

  1. Concurrent access guarantees for shared resources
  2. Recommended context timeout values
  3. Specific error conditions for each method
  4. Cleanup behavior for orphaned watchers

Consider adding package-level documentation about the Manager's role in resource management and typical usage patterns.

pkg/lib/watcher/README.md (2)

45-47: Consider enhancing the Manager section documentation.

While the description is accurate, consider adding details about the Manager's role in the two-step watcher creation pattern, specifically mentioning its responsibility in:

  • Supporting separate watcher creation and handler configuration
  • Managing watcher lifecycle states
  • Ensuring proper validation of watcher configuration

89-114: LGTM! Clear examples of both creation patterns.

The examples effectively demonstrate both the two-step and one-step creation patterns. The code is well-structured and matches the actual implementation.

Consider adding a note about when to prefer each pattern, for example:

  • Use two-step creation when handler configuration needs to be determined dynamically
  • Use one-step creation for simpler use cases with known handler configuration
🧰 Tools
🪛 Markdownlint

89-89: Expected: 1; Actual: 4; Style: 1/1/1
Ordered list item prefix

(MD029, ol-prefix)

pkg/lib/watcher/watcher_test.go (3)

927-1000: LGTM: Thorough testing of stop states with a minor suggestion

The test cases comprehensively cover:

  • Stopping a running watcher
  • Stopping an already stopped watcher
  • Stopping an unstarted watcher
  • Concurrent stop operations

Consider reducing the timeout in Eventually assertions from 200ms to a smaller value (e.g., 100ms) for faster test execution, as state transitions should happen quickly in tests.

-		}, 200*time.Millisecond, 10*time.Millisecond)
+		}, 100*time.Millisecond, 10*time.Millisecond)

1010-1019: Improve helper method names for clarity

The helper methods are well-implemented but their names could be more descriptive:

  • startAndWait could be renamed to startWatcherAndWaitForProcessing
  • startWaitAndStop could be renamed to runWatcherUntilCompletion

This would make their purposes more immediately clear to readers.


1004-1008: Add descriptive messages to Eventually assertions

Consider adding descriptive failure messages to Eventually assertions to make test failures more debuggable.

 func (s *WatcherTestSuite) wait(ctx context.Context, w watcher.Watcher, continuationSeqNum uint64) {
 	s.Require().Eventually(func() bool {
 		return w.Stats().State == watcher.StateRunning &&
 			w.Stats().NextEventIterator.SequenceNumber == continuationSeqNum
-	}, 1*time.Second, 10*time.Millisecond)
+	}, 1*time.Second, 10*time.Millisecond, "Watcher failed to reach expected state: running with sequence %d", continuationSeqNum)
 }

Also applies to: 1023-1025

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 747a85a and b9ffe3d.

📒 Files selected for processing (6)
  • pkg/lib/watcher/README.md (6 hunks)
  • pkg/lib/watcher/manager.go (1 hunks)
  • pkg/lib/watcher/manager_test.go (1 hunks)
  • pkg/lib/watcher/mocks.go (4 hunks)
  • pkg/lib/watcher/types.go (3 hunks)
  • pkg/lib/watcher/watcher_test.go (27 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/lib/watcher/manager.go
  • pkg/lib/watcher/manager_test.go
🧰 Additional context used
🪛 Markdownlint
pkg/lib/watcher/README.md

73-73: Expected: 1; Actual: 2; Style: 1/1/1
Ordered list item prefix

(MD029, ol-prefix)


89-89: Expected: 1; Actual: 4; Style: 1/1/1
Ordered list item prefix

(MD029, ol-prefix)

🔇 Additional comments (10)
pkg/lib/watcher/types.go (3)

22-22: LGTM: Well-documented checkpoint iterator field addition

The new field aligns with the PR's objective to improve checkpoint handling and follows consistent patterns.


43-50: Document thread safety guarantees and state transitions

While the two-step pattern is well-implemented, the thread safety aspects need clarification:

  1. Document whether SetHandler and Start are thread-safe
  2. Specify the valid state transitions (e.g., can SetHandler be called while Starting?)
  3. Consider adding state validation to prevent invalid transitions

The previous review comment about thread safety and race conditions between SetHandler and Start is still valid.


59-59: Clarify atomicity guarantees of SeekToOffset

The documentation should specify:

  1. Whether the stop-seek-restart sequence is atomic
  2. How in-flight events are handled
  3. Impact on checkpoint state

Let's check for any existing tests covering this scenario:

✅ Verification successful

Let me gather more context about the SeekToOffset implementation and its behavior.


Let me search for the SeekToOffset implementation and any related code that handles the watcher state.


Based on the code analysis, I can now provide a final response about the atomicity and data handling of SeekToOffset.

Documentation should be updated to clarify the implementation details

The implementation in pkg/lib/watcher/watcher.go shows that SeekToOffset:

  1. Stops the watcher first to prevent event processing during offset update
  2. Persists the new offset via checkpoint mechanism
  3. Restarts the watcher if it was previously running

The sequence is safe but not atomic, and events could be processed twice after restart. This is evidenced by the test TestSeekToOffset which expects the last event to be processed twice.

Suggested documentation update:

// SeekToOffset updates the watcher's position to start consuming from after the specified sequence number.
// Will stop and restart the watcher if running. Note that:
// 1. The stop-seek-restart sequence is not atomic
// 2. Events at the new offset may be processed twice after restart
// 3. Checkpoint state is updated to persist the new position
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for tests related to SeekToOffset behavior
rg -A 5 "TestSeekToOffset|SeekToOffset.*running" --type go

Length of output: 598


Script:

#!/bin/bash
# Get the full test implementation and related code
rg -B 5 -A 20 "TestSeekToOffset" --type go

# Look for SeekToOffset implementation
ast-grep --pattern 'func $_($_, $_) SeekToOffset($$$) {
  $$$
}'

Length of output: 1975


Script:

#!/bin/bash
# Look for SeekToOffset implementation and related methods
rg -A 10 "func.*SeekToOffset|func.*Stop|func.*Start" --type go

# Look for any checkpoint or state handling code
rg -A 5 "checkpoint|handleEvent|processEvents" --type go

Length of output: 98447

pkg/lib/watcher/README.md (2)

23-23: LGTM! Component renaming is clear and accurate.

The renaming from "Registry" to "Manager" is well documented and aligns with the architectural changes.


135-136: LGTM! Configuration options are well documented.

The new configuration options WithHandler and WithAutoStart are clearly documented and support both watcher creation patterns.

pkg/lib/watcher/mocks.go (3)

79-105: LGTM: New methods support two-step watcher creation pattern

The new SetHandler and Start methods are correctly implemented following GoMock patterns and support the PR's objective of separating watcher creation from handler configuration.


170-239: LGTM: Manager implementation aligns with new architecture

The renamed MockManager and its methods are well-implemented, with proper handling of variadic options in Create and correct error handling in all methods. This change aligns with the PR's objective of renaming Registry to Manager for better terminology consistency.


295-306: LGTM: Consistent parameter naming in EventStore methods

The parameter renaming in GetEvents and StoreEvent methods improves consistency by using the generic term 'request' for the request parameter types.

Also applies to: 339-349

pkg/lib/watcher/watcher_test.go (2)

24-25: LGTM: Well-structured context management

The addition of context management with a 5-second timeout is a good practice for test execution control and resource cleanup.

Also applies to: 42-42, 50-50


305-334: LGTM: Comprehensive test coverage for handler and autoStart options

The test cases thoroughly cover:

  • Error handling when autoStart is enabled without a handler
  • Automatic start behavior when both handler and autoStart are set
  • Default behavior with handler only

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant