Skip to content

Commit

Permalink
Handle watchers with future sequence numbers gracefully (#4773)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced logic for determining the starting iterator in the watcher,
improving error handling for various iterator types.
  
- **Bug Fixes**
- Improved handling of scenarios where the requested sequence number is
unavailable, preventing potential issues during event processing.

- **Tests**
- Expanded test cases for the watcher to cover additional iterator
behaviors and error handling scenarios, enhancing overall test coverage.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
wdbaruni authored Dec 16, 2024
1 parent 36c44b5 commit 823e9e9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
24 changes: 20 additions & 4 deletions pkg/lib/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,27 @@ func (w *watcher) determineStartingIterator(ctx context.Context, initial EventIt
}

// No checkpoint found, handle initial iterator
if initial.Type == EventIteratorTrimHorizon {
return initial, nil
}

latestSeqNum, err := w.store.GetLatestEventNum(ctx)
if err != nil {
return EventIterator{}, err
}

// If the requested sequence number is the latest, start from the current latest seqNum
if initial.Type == EventIteratorLatest {
latestSeqNum, err := w.store.GetLatestEventNum(ctx)
if err != nil {
return EventIterator{}, err
}
return AfterSequenceNumberIterator(latestSeqNum), nil
}

// If the requested sequence number is higher than the latest, start from the latest
if initial.SequenceNumber > latestSeqNum {
log.Ctx(ctx).Warn().
Str("watcher_id", w.id).
Uint64("requested_seq", initial.SequenceNumber).
Uint64("latest_seq", latestSeqNum).
Msg("requested sequence number is higher than latest, starting from latest instead")
return AfterSequenceNumberIterator(latestSeqNum), nil
}

Expand Down
40 changes: 35 additions & 5 deletions pkg/lib/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ func (s *WatcherTestSuite) TestDetermineStartingIterator() {
latestErr error
}{
{
name: "No checkpoint, non-latest iterator",
initialIter: watcher.AfterSequenceNumberIterator(5),
expectedIter: watcher.AfterSequenceNumberIterator(5),
name: "No checkpoint, non-latest iterator",
setupLatestEvent: ptr(uint64(10)), // Store event up to seq 15
initialIter: watcher.AfterSequenceNumberIterator(5),
expectedIter: watcher.AfterSequenceNumberIterator(5),
},
{
name: "With checkpoint, non-latest iterator",
Expand Down Expand Up @@ -152,6 +153,16 @@ func (s *WatcherTestSuite) TestDetermineStartingIterator() {
initialIter: watcher.LatestIterator(),
expectedIter: watcher.AfterSequenceNumberIterator(0),
},
{
name: "Empty store, at iterator",
initialIter: watcher.AtSequenceNumberIterator(0),
expectedIter: watcher.AtSequenceNumberIterator(0),
},
{
name: "Empty store, after iterator",
initialIter: watcher.AfterSequenceNumberIterator(0),
expectedIter: watcher.AfterSequenceNumberIterator(0),
},
{
name: "TrimHorizon with checkpoint",
setupCheckpoint: ptr(uint64(10)),
Expand All @@ -163,6 +174,25 @@ func (s *WatcherTestSuite) TestDetermineStartingIterator() {
initialIter: watcher.TrimHorizonIterator(),
expectedIter: watcher.TrimHorizonIterator(),
},
{
name: "Sequence at latest",
setupLatestEvent: ptr(uint64(15)),
initialIter: watcher.AtSequenceNumberIterator(15),
expectedIter: watcher.AtSequenceNumberIterator(15),
},

{
name: "Sequence too high, start after latest",
setupLatestEvent: ptr(uint64(15)),
initialIter: watcher.AtSequenceNumberIterator(20),
expectedIter: watcher.AfterSequenceNumberIterator(15),
},
{
name: "After sequence too high, start after latest",
setupLatestEvent: ptr(uint64(15)),
initialIter: watcher.AfterSequenceNumberIterator(20),
expectedIter: watcher.AfterSequenceNumberIterator(15),
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -657,7 +687,7 @@ func (s *WatcherTestSuite) TestEmptyEventStoreWithDifferentIterators() {
{
name: "AtSequenceNumber(1)",
iterator: watcher.AtSequenceNumberIterator(1),
expectedIterator: watcher.AtSequenceNumberIterator(1),
expectedIterator: watcher.AfterSequenceNumberIterator(0),
},
{
name: "AfterSequenceNumber(0)",
Expand All @@ -667,7 +697,7 @@ func (s *WatcherTestSuite) TestEmptyEventStoreWithDifferentIterators() {
{
name: "AfterSequenceNumber(1)",
iterator: watcher.AfterSequenceNumberIterator(1),
expectedIterator: watcher.AfterSequenceNumberIterator(1),
expectedIterator: watcher.AfterSequenceNumberIterator(0),
},
}

Expand Down

0 comments on commit 823e9e9

Please sign in to comment.