diff --git a/db/change_cache.go b/db/change_cache.go index 19995d5ae7..bbeecdcbee 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -576,7 +576,6 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64 } else { changedChannels.Add(unusedSeq) } - c.channelCache.AddUnusedSequence(change) if c.notifyChange != nil && len(changedChannels) > 0 { c.notifyChange(ctx, changedChannels) } @@ -599,7 +598,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen } changedChannels := c.processEntry(ctx, change) allChangedChannels = allChangedChannels.Update(changedChannels) - c.channelCache.AddUnusedSequence(change) if c.notifyChange != nil { c.notifyChange(ctx, allChangedChannels) } @@ -609,9 +607,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen // push unused range to either pending or skipped lists based on current state of the change cache allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived) - // update high seq cached - c.channelCache.AddUnusedSequence(&LogEntry{Sequence: toSequence}) - if c.notifyChange != nil { c.notifyChange(ctx, allChangedChannels) } @@ -804,8 +799,9 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []chann } delete(c.receivedSeqs, change.Sequence) - // If unused sequence or principal, we're done after updating sequence + // If unused sequence, notify the cache and return if change.DocID == "" { + c.channelCache.AddUnusedSequence(change) return nil } diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 2ad69f998b..a8621caef6 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -2430,7 +2430,7 @@ func TestReleasedSequenceRangeHandlingEverythingPending(t *testing.T) { assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) assert.Equal(c, uint64(2), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(25), dbContext.DbStats.CacheStats.HighSeqCached.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.HighSeqCached.Value()) }, time.Second*10, time.Millisecond*100) } @@ -2536,7 +2536,7 @@ func TestReleasedSequenceRangeHandlingEverythingPendingLowPendingCapacity(t *tes defer testChangeCache.Stop(ctx) require.NoError(t, err) - // process unused sequence range + // process unused sequence range, will be sent to pending. Triggers seq 1 being sent to skipped testChangeCache.releaseUnusedSequenceRange(ctx, 2, 25, time.Now()) require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -2647,7 +2647,7 @@ func TestReleasedSequenceRangeHandlingSingleSequence(t *testing.T) { assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value()) assert.Equal(c, uint64(1), testChangeCache.nextSequence) dbContext.UpdateCalculatedStats(ctx) - assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.HighSeqCached.Value()) + assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.HighSeqCached.Value()) }, time.Second*10, time.Millisecond*100) // process change that should overload pending and push sequence 1 to skipped diff --git a/db/channel_cache.go b/db/channel_cache.go index d4ab42371d..716a996a92 100644 --- a/db/channel_cache.go +++ b/db/channel_cache.go @@ -188,7 +188,11 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) { // Add unused Sequence notifies the cache of an unused sequence update. Updates the cache's high sequence func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) { - c.updateHighCacheSequence(change.Sequence) + if change.EndSequence > 0 { + c.updateHighCacheSequence(change.EndSequence) + } else { + c.updateHighCacheSequence(change.Sequence) + } } // Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence