Skip to content

Commit

Permalink
pebble: implement db.mu RWMutex
Browse files Browse the repository at this point in the history
without this, when Get is called concurrently during Commit,
pebble.ErrNotFound is returned by Get
  • Loading branch information
altergui committed Sep 19, 2023
1 parent ec5aa37 commit 9773b0a
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions db/pebbledb/pebledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pebbledb
import (
"errors"
"os"
"sync"

"github.com/cockroachdb/pebble"
"go.vocdoni.io/dvote/db"
Expand All @@ -12,6 +13,7 @@ import (
// WriteTx implements the interface db.WriteTx
type WriteTx struct {
batch *pebble.Batch
dbMu *sync.RWMutex
}

// check that WriteTx implements the db.WriteTx interface
Expand Down Expand Up @@ -64,11 +66,15 @@ func iterate(reader pebble.Reader, prefix []byte, callback func(k, v []byte) boo

// Get implements the db.WriteTx.Get interface method
func (tx WriteTx) Get(k []byte) ([]byte, error) {
tx.dbMu.RLock()
defer tx.dbMu.RUnlock()
log.Errorf("WriteTx.Get %x", k)
return get(tx.batch, k)
}

func (tx WriteTx) Iterate(prefix []byte, callback func(k, v []byte) bool) (err error) {
tx.dbMu.RLock()
defer tx.dbMu.RUnlock()
return iterate(tx.batch, prefix, callback)
}

Expand All @@ -90,6 +96,8 @@ func (tx WriteTx) Apply(other db.WriteTx) (err error) {

// Commit implements the db.WriteTx.Commit interface method
func (tx WriteTx) Commit() error {
tx.dbMu.Lock()
defer tx.dbMu.Unlock()
log.Errorf("tx.batch.Commit with len %d", tx.batch.Len())
defer log.Errorf("tx.batch.Commit end, stats %v", tx.batch.CommitStats())
return tx.batch.Commit(nil)
Expand All @@ -104,6 +112,7 @@ func (tx WriteTx) Discard() {
// PebbleDB implements db.Database interface
type PebbleDB struct {
db *pebble.DB
mu *sync.RWMutex
}

// check that PebbleDB implements the db.Database interface
Expand Down Expand Up @@ -134,6 +143,8 @@ func New(opts db.Options) (*PebbleDB, error) {

// Get implements the db.WriteTx.Get interface method
func (db *PebbleDB) Get(k []byte) ([]byte, error) {
db.mu.RLock()
defer db.mu.RUnlock()
log.Errorf("PebbleDB.Get %x", k)
return get(db.db, k)
}
Expand All @@ -142,11 +153,14 @@ func (db *PebbleDB) Get(k []byte) ([]byte, error) {
func (db *PebbleDB) WriteTx() db.WriteTx {
return WriteTx{
batch: db.db.NewIndexedBatch(),
dbMu: db.mu,
}
}

// Close closes the PebbleDB
func (db *PebbleDB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
return db.db.Close()
}

Expand Down

0 comments on commit 9773b0a

Please sign in to comment.