diff --git a/pkg/lib/watcher/watcher.go b/pkg/lib/watcher/watcher.go index 7934842855..1b5bfb64a2 100644 --- a/pkg/lib/watcher/watcher.go +++ b/pkg/lib/watcher/watcher.go @@ -92,11 +92,27 @@ func (w *watcher) determineStartingIterator(ctx context.Context, initial EventIt } // No checkpoint found, handle initial iterator + if initial.Type == EventIteratorTrimHorizon { + return initial, nil + } + + latestSeqNum, err := w.store.GetLatestEventNum(ctx) + if err != nil { + return EventIterator{}, err + } + + // If the requested sequence number is the latest, start from the current latest seqNum if initial.Type == EventIteratorLatest { - latestSeqNum, err := w.store.GetLatestEventNum(ctx) - if err != nil { - return EventIterator{}, err - } + return AfterSequenceNumberIterator(latestSeqNum), nil + } + + // If the requested sequence number is higher than the latest, start from the latest + if initial.SequenceNumber > latestSeqNum { + log.Ctx(ctx).Warn(). + Str("watcher_id", w.id). + Uint64("requested_seq", initial.SequenceNumber). + Uint64("latest_seq", latestSeqNum). + Msg("requested sequence number is higher than latest, starting from latest instead") return AfterSequenceNumberIterator(latestSeqNum), nil } diff --git a/pkg/lib/watcher/watcher_test.go b/pkg/lib/watcher/watcher_test.go index 82603a9ba1..5f8e59a100 100644 --- a/pkg/lib/watcher/watcher_test.go +++ b/pkg/lib/watcher/watcher_test.go @@ -112,9 +112,10 @@ func (s *WatcherTestSuite) TestDetermineStartingIterator() { latestErr error }{ { - name: "No checkpoint, non-latest iterator", - initialIter: watcher.AfterSequenceNumberIterator(5), - expectedIter: watcher.AfterSequenceNumberIterator(5), + name: "No checkpoint, non-latest iterator", + setupLatestEvent: ptr(uint64(10)), // Store event up to seq 15 + initialIter: watcher.AfterSequenceNumberIterator(5), + expectedIter: watcher.AfterSequenceNumberIterator(5), }, { name: "With checkpoint, non-latest iterator", @@ -152,6 +153,16 @@ func (s *WatcherTestSuite) TestDetermineStartingIterator() { initialIter: watcher.LatestIterator(), expectedIter: watcher.AfterSequenceNumberIterator(0), }, + { + name: "Empty store, at iterator", + initialIter: watcher.AtSequenceNumberIterator(0), + expectedIter: watcher.AtSequenceNumberIterator(0), + }, + { + name: "Empty store, after iterator", + initialIter: watcher.AfterSequenceNumberIterator(0), + expectedIter: watcher.AfterSequenceNumberIterator(0), + }, { name: "TrimHorizon with checkpoint", setupCheckpoint: ptr(uint64(10)), @@ -163,6 +174,25 @@ func (s *WatcherTestSuite) TestDetermineStartingIterator() { initialIter: watcher.TrimHorizonIterator(), expectedIter: watcher.TrimHorizonIterator(), }, + { + name: "Sequence at latest", + setupLatestEvent: ptr(uint64(15)), + initialIter: watcher.AtSequenceNumberIterator(15), + expectedIter: watcher.AtSequenceNumberIterator(15), + }, + + { + name: "Sequence too high, start after latest", + setupLatestEvent: ptr(uint64(15)), + initialIter: watcher.AtSequenceNumberIterator(20), + expectedIter: watcher.AfterSequenceNumberIterator(15), + }, + { + name: "After sequence too high, start after latest", + setupLatestEvent: ptr(uint64(15)), + initialIter: watcher.AfterSequenceNumberIterator(20), + expectedIter: watcher.AfterSequenceNumberIterator(15), + }, } for _, tc := range testCases { @@ -657,7 +687,7 @@ func (s *WatcherTestSuite) TestEmptyEventStoreWithDifferentIterators() { { name: "AtSequenceNumber(1)", iterator: watcher.AtSequenceNumberIterator(1), - expectedIterator: watcher.AtSequenceNumberIterator(1), + expectedIterator: watcher.AfterSequenceNumberIterator(0), }, { name: "AfterSequenceNumber(0)", @@ -667,7 +697,7 @@ func (s *WatcherTestSuite) TestEmptyEventStoreWithDifferentIterators() { { name: "AfterSequenceNumber(1)", iterator: watcher.AfterSequenceNumberIterator(1), - expectedIterator: watcher.AfterSequenceNumberIterator(1), + expectedIterator: watcher.AfterSequenceNumberIterator(0), }, }