Skip to content

Commit

Permalink
pebble: implement WriteTx.batchMu RWMutex
Browse files Browse the repository at this point in the history
according to upstream docs: "A batch is not safe for concurrent use, and
consumers should use a batch per goroutine or provide their own synchronization."
  • Loading branch information
altergui committed Sep 19, 2023
1 parent f31d91c commit fc75e10
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions db/pebbledb/pebledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (

// WriteTx implements the interface db.WriteTx
type WriteTx struct {
batch *pebble.Batch
dbMu *sync.RWMutex
batch *pebble.Batch
batchMu *sync.RWMutex
dbMu *sync.RWMutex
}

// check that WriteTx implements the db.WriteTx interface
Expand Down Expand Up @@ -66,36 +67,48 @@ func iterate(reader pebble.Reader, prefix []byte, callback func(k, v []byte) boo

// Get implements the db.WriteTx.Get interface method
func (tx WriteTx) Get(k []byte) ([]byte, error) {
tx.batchMu.RLock()
defer tx.batchMu.RUnlock()
tx.dbMu.RLock()
defer tx.dbMu.RUnlock()
log.Errorf("WriteTx.Get %x", k)
return get(tx.batch, k)
}

func (tx WriteTx) Iterate(prefix []byte, callback func(k, v []byte) bool) (err error) {
tx.batchMu.RLock()
defer tx.batchMu.RUnlock()
tx.dbMu.RLock()
defer tx.dbMu.RUnlock()
return iterate(tx.batch, prefix, callback)
}

// Set implements the db.WriteTx.Set interface method
func (tx WriteTx) Set(k, v []byte) error {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
return tx.batch.Set(k, v, nil)
}

// Delete implements the db.WriteTx.Delete interface method
func (tx WriteTx) Delete(k []byte) error {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
return tx.batch.Delete(k, nil)
}

// Apply implements the db.WriteTx.Apply interface method
func (tx WriteTx) Apply(other db.WriteTx) (err error) {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
otherPebble := db.UnwrapWriteTx(other).(WriteTx)
return tx.batch.Apply(otherPebble.batch, nil)
}

// Commit implements the db.WriteTx.Commit interface method
func (tx WriteTx) Commit() error {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
tx.dbMu.Lock()
defer tx.dbMu.Unlock()
log.Errorf("tx.batch.Commit with len %d", tx.batch.Len())
Expand All @@ -105,6 +118,8 @@ func (tx WriteTx) Commit() error {

// Discard implements the db.WriteTx.Discard interface method
func (tx WriteTx) Discard() {
tx.batchMu.Lock()
defer tx.batchMu.Unlock()
// Close returns an error, but here in the Discard context is omitted
tx.batch.Close()
}
Expand Down Expand Up @@ -153,8 +168,9 @@ func (db *PebbleDB) Get(k []byte) ([]byte, error) {
// WriteTx returns a db.WriteTx
func (db *PebbleDB) WriteTx() db.WriteTx {
return WriteTx{
batch: db.db.NewIndexedBatch(),
dbMu: db.mu,
batch: db.db.NewIndexedBatch(),
batchMu: &sync.RWMutex{},
dbMu: db.mu,
}
}

Expand Down

0 comments on commit fc75e10

Please sign in to comment.