Skip to content

Commit

Permalink
pebble: implement WriteTx.batchMu RWMutex
Browse files Browse the repository at this point in the history
according to upstream docs: "A batch is not safe for concurrent use, and
consumers should use a batch per goroutine or provide their own synchronization."
  • Loading branch information
altergui committed Sep 19, 2023
1 parent 9773b0a commit a58e7e8
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions db/pebbledb/pebledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (

// WriteTx implements the interface db.WriteTx
type WriteTx struct {
batch *pebble.Batch
dbMu *sync.RWMutex
batch *pebble.Batch
dbMu *sync.RWMutex
batchMu *sync.RWMutex
}

// check that WriteTx implements the db.WriteTx interface
Expand Down Expand Up @@ -68,28 +69,38 @@ func iterate(reader pebble.Reader, prefix []byte, callback func(k, v []byte) boo
func (tx WriteTx) Get(k []byte) ([]byte, error) {
tx.dbMu.RLock()
defer tx.dbMu.RUnlock()
tx.batchMu.RLock()
defer tx.batchMu.RUnlock()
log.Errorf("WriteTx.Get %x", k)
return get(tx.batch, k)
}

func (tx WriteTx) Iterate(prefix []byte, callback func(k, v []byte) bool) (err error) {
tx.dbMu.RLock()
defer tx.dbMu.RUnlock()
tx.batchMu.RLock()
defer tx.batchMu.RUnlock()
return iterate(tx.batch, prefix, callback)
}

// Set implements the db.WriteTx.Set interface method
func (tx WriteTx) Set(k, v []byte) error {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
return tx.batch.Set(k, v, nil)
}

// Delete implements the db.WriteTx.Delete interface method
func (tx WriteTx) Delete(k []byte) error {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
return tx.batch.Delete(k, nil)
}

// Apply implements the db.WriteTx.Apply interface method
func (tx WriteTx) Apply(other db.WriteTx) (err error) {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
otherPebble := db.UnwrapWriteTx(other).(WriteTx)
return tx.batch.Apply(otherPebble.batch, nil)
}
Expand All @@ -98,13 +109,17 @@ func (tx WriteTx) Apply(other db.WriteTx) (err error) {
func (tx WriteTx) Commit() error {
tx.dbMu.Lock()
defer tx.dbMu.Unlock()
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
log.Errorf("tx.batch.Commit with len %d", tx.batch.Len())
defer log.Errorf("tx.batch.Commit end, stats %v", tx.batch.CommitStats())
return tx.batch.Commit(nil)
}

// Discard implements the db.WriteTx.Discard interface method
func (tx WriteTx) Discard() {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
// Close returns an error, but here in the Discard context is omitted
tx.batch.Close()
}
Expand Down

0 comments on commit a58e7e8

Please sign in to comment.