Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add persisted watcher library #4307

Merged
merged 9 commits into from
Aug 8, 2024
Merged

add persisted watcher library #4307

merged 9 commits into from
Aug 8, 2024

Conversation

wdbaruni
Copy link
Member

@wdbaruni wdbaruni commented Aug 7, 2024

Watcher Library

Overview

The Watcher Library is an internal component of the Bacalhau project that provides a robust event watching and processing system. It's designed to efficiently store, retrieve, and process events. The library ensures events are stored in a durable, ordered manner, allowing for consistent and reliable event processing. It supports features like checkpointing, filtering, and long-polling, while maintaining the ability to replay events from any point in the event history.

Key Features

  1. Ordered Event Processing: Events are processed in the exact order they were created, ensuring consistency and predictability in event handling.
  2. Durability: Events are stored persistently in BoltDB, ensuring they survive system restarts or crashes.
  3. Replayability: The system allows replaying events from any point in history, facilitating data recovery, debugging, and system reconciliation.
  4. Concurrency: Multiple watchers can process events concurrently, improving system throughput.
  5. Filtering: Watchers can filter events based on object types and operations, allowing for targeted event processing.
  6. Checkpointing: Watchers can save their progress and resume from where they left off, enhancing reliability and efficiency.
  7. Long-polling: Efficient event retrieval with support for long-polling, reducing unnecessary network traffic and database queries.
  8. Garbage Collection: Automatic cleanup of old events to manage storage while maintaining the ability to replay from critical points.
  9. Flexible Event Iteration: Different types of iterators for various use cases, including the ability to start from the oldest event, the latest event, or any specific point in the event history.

Key Components

  1. Registry: Manages multiple watchers and provides methods to create and manage watchers.
  2. Watcher: Represents a single event watcher that processes events sequentially.
  3. EventStore: Responsible for storing and retrieving events, with BoltDB as the default implementation.
  4. EventHandler: Interface for handling individual events.
  5. Serializer: Handles the serialization and deserialization of events.

Core Concepts

Event

An Event represents a single occurrence in the system. It has the following properties:

  • SeqNum: A unique, sequential identifier for the event.
  • Operation: The type of operation (Create, Update, Delete).
  • ObjectType: The type of object the event relates to.
  • Object: The actual data associated with the event.
  • Timestamp: When the event occurred.

EventStore

The EventStore is responsible for persisting events and providing methods to retrieve them. It uses BoltDB as the underlying storage engine and supports features like caching, checkpointing, and garbage collection.

Registry

The Registry manages multiple watchers. It's the main entry point for components that want to subscribe to events.

Watcher

A Watcher represents a single subscriber to events. It processes events sequentially and can be configured with filters and checkpoints.

EventIterator

An EventIterator defines the starting position for reading events. There are four types of iterators:

  1. TrimHorizonIterator: Starts from the oldest available event.
  2. LatestIterator: Starts from the latest available event.
  3. AtSequenceNumberIterator: Starts at a specific sequence number.
  4. AfterSequenceNumberIterator: Starts after a specific sequence number.

Usage

Here's how you typically use the Watcher library within Bacalhau:

  1. Create an EventStore:
db, _ := bbolt.Open("events.db", 0600, nil)
store, _ := boltdb.NewEventStore(db)
  1. Create a Registry:
registry := watcher.NewRegistry(store)
  1. Implement an EventHandler:
type MyHandler struct{}

func (h *MyHandler) HandleEvent(ctx context.Context, event watcher.Event) error {
    // Process the event
    return nil
}
  1. Start watching for events:
watcher, _ := registry.Watch(ctx, "my-watcher", &MyHandler{}, 
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
)
  1. Store events:
store.StoreEvent(ctx, watcher.OperationCreate, "Job", jobData)

Configuration

Watch Configuration

When creating a watcher, you can configure it with various options:

  • WithInitialEventIterator(iterator EventIterator): Sets the starting position for watching if no checkpoint is found.
  • WithFilter(filter EventFilter): Sets the event filter for watching.
  • WithBufferSize(size int): Sets the size of the event buffer.
  • WithBatchSize(size int): Sets the number of events to fetch in each batch.
  • WithInitialBackoff(backoff time.Duration): Sets the initial backoff duration for retries.
  • WithMaxBackoff(backoff time.Duration): Sets the maximum backoff duration for retries.
  • WithMaxRetries(maxRetries int): Sets the maximum number of retries for event handling.
  • WithRetryStrategy(strategy RetryStrategy): Sets the retry strategy for event handling.

Example:

watcher, err := registry.Watch(ctx, "my-watcher", &MyHandler{},
    watcher.WithInitialEventIterator(watcher.TrimHorizonIterator()),
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
    watcher.WithBufferSize(1000),
    watcher.WithBatchSize(100),
    watcher.WithMaxRetries(3),
    watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
)

EventStore Configuration (BoltDB)

The BoltDB EventStore can be configured with various options:

  • WithEventsBucket(name string): Sets the name of the bucket used to store events.
  • WithCheckpointBucket(name string): Sets the name of the bucket used to store checkpoints.
  • WithEventSerializer(serializer watcher.Serializer): Sets the serializer used for events.
  • WithCacheSize(size int): Sets the size of the LRU cache used to store events.
  • WithLongPollingTimeout(timeout time.Duration): Sets the timeout duration for long-polling requests.
  • WithGCAgeThreshold(threshold time.Duration): Sets the age threshold for event pruning.
  • WithGCCadence(cadence time.Duration): Sets the interval at which garbage collection runs.
  • WithGCMaxRecordsPerRun(max int): Sets the maximum number of records to process in a single GC run.
  • WithGCMaxDuration(duration time.Duration): Sets the maximum duration for a single GC run.

Example:

store, err := boltdb.NewEventStore(db,
    boltdb.WithEventsBucket("myEvents"),
    boltdb.WithCheckpointBucket("myCheckpoints"),
    boltdb.WithCacheSize(1000),
    boltdb.WithLongPollingTimeout(10*time.Second),
)

Best Practices

  1. Use meaningful watcher IDs to easily identify different components subscribing to events.
  2. Implement error handling in your EventHandler to ensure robust event processing.
  3. Use appropriate filters to minimize unnecessary event processing.
  4. Regularly checkpoint your watchers to enable efficient restarts.
  5. Monitor watcher stats to ensure they're keeping up with event volume.

Troubleshooting

  1. If a watcher is falling behind, consider increasing the batch size or optimizing the event handling logic.
  2. For performance issues, check the BoltDB file size and consider tuning the garbage collection parameters.

Future Improvements

  1. Enhanced monitoring and metrics.

Copy link
Contributor

coderabbitai bot commented Aug 7, 2024

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@wdbaruni wdbaruni requested a review from udsamani August 7, 2024 11:47
@wdbaruni wdbaruni marked this pull request as ready for review August 7, 2024 11:49
@wdbaruni wdbaruni added the build/go A label to indicate buildkite to trigger golang builds for this PR label Aug 7, 2024
Copy link
Member

@frrist frrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good stuff.

left a couple drive-by comments.

return
}

w.ctx, w.cancel = context.WithCancel(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please find an alternative to the embedded context (provide it as parameter instead). It's a pattern we need to avoid. https://go.dev/blog/context-and-structs

Contexts should not be stored inside a struct type, but instead passed to each function that needs it.
When designing an API with context, remember the advice: pass context.Context in as an argument; don’t store it in structs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I understand the concern about storing the provided context and the bad practice mentioned in the link. Though keep in mind I am not storing a provided context as I am creating a fresh one inside Start. My goal is to have better control over how and when background tasks are stopped, which context cancellation doesn't provide.

In this case, the Start() method doesn't accept a context. Instead, I create a new context inside Start to manage the lifecycle of internal calls and to exit any pending GetEvents or HandleEvent calls. Initially, I used a stop channel and a fresh background context in each loop, with the stop channel canceling the context. But this was more complicated than necessary.

I realized I only needed to store the cancel method, not the context itself. Fixed in e6fd1c9

Comment on lines 76 to 82
err = errors.Join(
validate.NotNil(db, "boltDB instance cannot be nil"),
options.validate(),
)
if err != nil {
return nil, err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be performed before the cache and event store are created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Fixed in f74da4d

Comment on lines +203 to +208
case <-s.notifyCh:
// New events might be available, loop and check again
// Drain any additional notifications
for len(s.notifyCh) > 0 {
<-s.notifyCh
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment in f74da4d

notifyCh is a channel for notifying watchers of new events.
GetEvents will block on this channel when no events are immediately available,
or will return empty events after a long-polling timeout.

Copy link
Contributor

@udsamani udsamani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks Good To Me :) Great Job documenting it ! Really clear for a long PR.

@wdbaruni wdbaruni merged commit f82ca46 into main Aug 8, 2024
9 of 10 checks passed
@wdbaruni wdbaruni deleted the watcher branch August 8, 2024 12:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build/go A label to indicate buildkite to trigger golang builds for this PR
Projects
None yet
3 participants