Skip to content

Commit

Permalink
Use nice atomics
Browse files Browse the repository at this point in the history
  • Loading branch information
banks committed Oct 7, 2024
1 parent f4390ec commit 18b8290
Showing 1 changed file with 26 additions and 28 deletions.
54 changes: 26 additions & 28 deletions log_cache_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@ import (
// writing and flush to disk in background and in larger batches than written
// through group commit.
type LogCacheAsync struct {
// These first fields are accessed atomically and are first to ensure they
// stay 64 bit aligned. They are only updated with state locked but are read
// without holding the mutex by concurrent readers.

// lastIndex is the highest readable Log index.
lastIndex uint64
lastIndex atomic.Uint64

// persistentIndex is the highest index known to be safely synced to the
// underlying store.
persistentIndex uint64
persistentIndex atomic.Uint64

// Static params
store LogStore
monotonic bool
size uint64
sizeMask uint64

// mu protects the mutable state
// state is the (non-atomic) mutable state of the cache protected by the
// embedded Mutex.
state struct {
sync.Mutex

Expand Down Expand Up @@ -68,12 +65,12 @@ func NewLogCacheAsync(capacity int, store LogStore) (*LogCacheAsync, error) {
size := nextPowerOf2(uint64(capacity))

c := &LogCacheAsync{
lastIndex: last,
persistentIndex: last,
store: store,
size: size,
sizeMask: size - 1, // 0b10000 -> 0b01111
store: store,
size: size,
sizeMask: size - 1, // 0b10000 -> 0b01111
}
c.lastIndex.Store(last)
c.persistentIndex.Store(last)
c.state.cache = make([]*Log, size)
c.state.triggerChan = make(chan syncRequest, 1)

Expand All @@ -99,14 +96,14 @@ func (c *LogCacheAsync) FirstIndex() (uint64, error) {

// LastIndex returns the last index written. 0 for no entries.
func (c *LogCacheAsync) LastIndex() (uint64, error) {
return atomic.LoadUint64(&c.lastIndex), nil
return c.lastIndex.Load(), nil
}

// minPossibleIdx is the lowest log we could possibly have cached. We might
// not have that low because we just started or because we are currently
// writing a batch over the top, but it's a lower bound.
func (c *LogCacheAsync) minPossibleIdx() uint64 {
lastIdx := atomic.LoadUint64(&c.lastIndex)
lastIdx := c.lastIndex.Load()
minPossibleIdx := uint64(1)
if lastIdx > c.size {
minPossibleIdx = lastIdx - c.size
Expand Down Expand Up @@ -166,8 +163,8 @@ func (c *LogCacheAsync) StoreLogs(logs []*Log) error {
c.state.cache[l.Index&c.sizeMask] = l
}
lastIdx := logs[len(logs)-1].Index
atomic.StoreUint64(&c.lastIndex, lastIdx)
atomic.StoreUint64(&c.persistentIndex, lastIdx)
c.lastIndex.Store(lastIdx)
c.persistentIndex.Store(lastIdx)
c.state.Unlock()
return nil
}
Expand Down Expand Up @@ -270,8 +267,8 @@ func (c *LogCacheAsync) runFlusher() {

// Load the state under lock
c.state.Lock()
persistedIdx := atomic.LoadUint64(&c.persistentIndex)
lastIdx := atomic.LoadUint64(&c.lastIndex)
persistedIdx := c.persistentIndex.Load()
lastIdx := c.lastIndex.Load()

// Make sure to reset batch!
batch = batch[:0]
Expand Down Expand Up @@ -346,7 +343,7 @@ func (c *LogCacheAsync) deliverCompletionLocked(lwc *LogWriteCompletion) {
return
}
if lwc.Error == nil {
atomic.StoreUint64(&c.persistentIndex, lwc.PersistentIndex)
c.persistentIndex.Store(lwc.PersistentIndex)
}
c.state.completionCh <- *lwc
}
Expand All @@ -369,7 +366,7 @@ func (c *LogCacheAsync) doFlush(logs []*Log, start time.Time) *LogWriteCompletio
}
if err == nil {
lwc.PersistentIndex = logs[len(logs)-1].Index
atomic.StoreUint64(&c.persistentIndex, logs[len(logs)-1].Index)
c.persistentIndex.Store(lwc.PersistentIndex)
}
return &lwc
}
Expand Down Expand Up @@ -409,18 +406,19 @@ func (c *LogCacheAsync) StoreLogsAsync(logs []*Log) error {

start := time.Now()

persistedIdx := atomic.LoadUint64(&c.persistentIndex)
lastIdx := atomic.LoadUint64(&c.lastIndex)
persistedIdx := c.persistentIndex.Load()
lastIdx := c.lastIndex.Load()

// Make sure there is room in the cache for all the logs we need to write.
// It's very unlikely there won't be, but if we are writing really fast into a
// small cache and the flusher is blocked on IO for a while then we need to
// ensure we don't overwrite cache entries that are not persistent yet!
for !hasSpaceFor(len(logs), lastIdx, persistedIdx, c.size) {
// We need to block and wait for they sync thread to persist some more
// We need to block and wait for the flusher thread to persist some more
// stuff! We do that by sending a sync request even though it's already busy
// this lets us get notified about when it's free. Note that even though we
// unlock and it's _possible_ for another StoreLogsAsync call to be made,
// this lets us get notified about when it's free. Note that we don't need
// to worry about concurrent calls to StoreLogsAsync since the leader loop
// can only call it from one goroutine.
doneCh := make(chan struct{})
// Unlock before we send since we might block if the flusher is busy but it
// won't be able to complete without the lock.
Expand All @@ -430,8 +428,8 @@ func (c *LogCacheAsync) StoreLogsAsync(logs []*Log) error {
c.state.Lock()
// Reload the indexes now sync is done so we can check if there is space
// now.
persistedIdx = atomic.LoadUint64(&c.persistentIndex)
lastIdx = atomic.LoadUint64(&c.lastIndex)
persistedIdx = c.persistentIndex.Load()
lastIdx = c.lastIndex.Load()
}

writeIdx := lastIdx + 1
Expand All @@ -444,7 +442,7 @@ func (c *LogCacheAsync) StoreLogsAsync(logs []*Log) error {
lastIdx = log.Index
writeIdx++
}
atomic.StoreUint64(&c.lastIndex, lastIdx)
c.lastIndex.Store(lastIdx)

// Trigger a sync in the background
doneCh := make(chan struct{})
Expand Down

0 comments on commit 18b8290

Please sign in to comment.