Skip to content

Commit

Permalink
Merge pull request #11 from manosriram/feat/batch_write
Browse files Browse the repository at this point in the history
add batch write support
  • Loading branch information
manosriram authored Jan 3, 2024
2 parents e13f921 + a47662b commit 9e1fff8
Show file tree
Hide file tree
Showing 12 changed files with 734 additions and 149 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
run: make test

- name: Run Test with race checks
run: make testrace
run: make test-race

- name: Run Benchmark Test
run: make bench
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ test_data
concurrent_test_data
.DS_Store
.vscode
tests/nimbusdb*
benchmark/nimbusdb*
nimbusdb_temp*
benchmark/nimbusdb_temp*
gcreport.txt
13 changes: 8 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
db:
go run examples/db.go
go run examples/db/db.go

batch:
go run examples/batch/batch.go

build:
go build -v

clean:
rm -rf tests/nimbusdb* benchmark/nimbusdb*
rm -rf ./nimbusdb_temp* benchmark/nimbusdb_temp*

test:
go test ./tests -v
go test -v -failfast
.PHONY: all test

testrace:
go test ./tests -v --race
test-race:
go test -v -failfast --race

bench:
cd benchmark && go test -bench=. -benchmem
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ As time passes, expired or deleted keys take up space that is not useful. Hence,
Supports `Sync` which can be called periodically to remove expired/deleted keys from disk and free-up more space.
</details>

<details>
<summary>
Supports Batch operations
</summary>
Batch operations can be performed and committed to save to disk or rollbacked to discard the batch. Operations
cannot be performed once the batch is closed.
</details>

<details>
<summary>
Single disk-seek write
Expand Down Expand Up @@ -102,6 +110,43 @@ if err != nil {
}
```

#### Batch Operations
```go
d, err := nimbusdb.Open(&nimbusdb.Options{Path: "/path/to/data/directory"})
if err != nil {
// handle error
}
defer d.Close()
b, err := d.NewBatch()
if err != nil {
// handle error
}
defer b.Close()

_, err = b.Set([]byte("key"), []byte("value")) // not written to disk yet.
if err != nil {
// handle error
}

key, err := b.Get([]byte("key"))
if err != nil {
// handle error
}

err = b.Delete([]byte("key"))
if err != nil {
// handle error
}

exists, err := b.Exists([]byte("key"))
if err != nil {
// handle error
}

b.Commit() // write all pending writes to disk
b.Rollback() // discard all pending writes
```


[Progress Board](https://trello.com/b/2eDSLLb3/nimbusdb) | [Streams](https://youtube.com/playlist?list=PLJALjJgNSDVo5veOf2apgMIE1QgN7IEfk) | [godoc](https://pkg.go.dev/github.com/manosriram/nimbusdb)

Expand Down
214 changes: 214 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package nimbusdb

import (
"bytes"
"errors"
"sync"
"time"

"github.com/manosriram/nimbusdb/utils"
)

var (
ERROR_BATCH_CLOSED = errors.New("batch is closed")
ERROR_CANNOT_CLOSE_CLOSED_BATCH = errors.New("cannot close closed batch")
)

type Batch struct {
db *Db
closed bool
batchlock sync.Mutex
mu sync.RWMutex
writeQueue []*KeyValuePair
}

func (db *Db) NewBatch() (*Batch, error) {
b := &Batch{
db: db,
closed: false,
}
b.batchlock.Lock()
return b, nil
}

func (b *Batch) Close() error {
if b.db.closed {
return ERROR_DB_CLOSED
}
if b.closed {
return ERROR_CANNOT_CLOSE_CLOSED_BATCH
}
if len(b.writeQueue) > 0 { // flush all pending queue writes to disk
err := b.Commit()
if err != nil {
return err
}
}
b.closed = true
b.batchlock.Unlock()
return nil
}

func (b *Batch) Get(k []byte) ([]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return nil, ERROR_BATCH_CLOSED
}

index := -1
var record *KeyValuePair
for i := range b.writeQueue {
if bytes.Equal(k, b.writeQueue[i].Key) {
record = b.writeQueue[i]
index = i
break
}
}

if index != -1 { // key found in write queue
return utils.Encode(record.Value), nil
}

v, err := b.db.getKeyDir(k) // else, search datafiles
if err != nil {
return nil, err
}
return v.v, nil
}

func (b *Batch) Exists(k []byte) (bool, error) {
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return false, ERROR_BATCH_CLOSED
}
index := -1
for i := range b.writeQueue {
if bytes.Equal(k, b.writeQueue[i].Key) {
index = i
break
}
}

if index != -1 { // key found in write queue
return true, nil
}

v, err := b.db.getKeyDir(k) // else, search datafiles
if err != nil {
return false, err
}
return bytes.Equal(v.k, k), nil
}

func (b *Batch) Set(k []byte, v []byte) ([]byte, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return nil, ERROR_BATCH_CLOSED
}
index := -1
for i := range b.writeQueue {
if bytes.Equal(k, b.writeQueue[i].Key) {
index = i
break
}
}
kv := &KeyValuePair{
Key: k,
Value: v,
}
if index != -1 {
b.writeQueue[index] = kv
} else {
b.writeQueue = append(b.writeQueue, kv)
}
return v, nil
}

func (b *Batch) SetWithTTL(k []byte, v []byte, ttl time.Duration) ([]byte, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return nil, ERROR_BATCH_CLOSED
}
index := -1
for i := range b.writeQueue {
if bytes.Equal(k, b.writeQueue[i].Key) {
index = i
break
}
}
kv := &KeyValuePair{
Key: k,
Value: v,
Ttl: ttl,
}
if index != -1 {
b.writeQueue[index] = kv
} else {
b.writeQueue = append(b.writeQueue, kv)
}
return v, nil
}

func (b *Batch) Delete(k []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return ERROR_BATCH_CLOSED
}

index := -1
for i := range b.writeQueue {
if bytes.Equal(k, b.writeQueue[i].Key) {
index = i
break
}
}

if index != -1 {
b.writeQueue[index] = b.writeQueue[len(b.writeQueue)-1]
b.writeQueue[len(b.writeQueue)-1] = nil
b.writeQueue = b.writeQueue[:len(b.writeQueue)-1]
} else {
err := b.db.Delete(k)
if err != nil {
return err
}
return nil
}
return nil
}

func (b *Batch) Commit() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return ERROR_BATCH_CLOSED
}
var err error
for i := range b.writeQueue {
if b.writeQueue[i].Ttl == 0 {
_, err = b.db.Set(b.writeQueue[i].Key, utils.Encode(b.writeQueue[i].Value))
} else {
_, err = b.db.SetWithTTL(b.writeQueue[i].Key, utils.Encode(b.writeQueue[i].Value), b.writeQueue[i].Ttl)
}
if err != nil {
return err
}
}
b.writeQueue = nil
return nil
}

func (b *Batch) Rollback() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return ERROR_BATCH_CLOSED
}
b.writeQueue = nil
return nil
}
Loading

0 comments on commit 9e1fff8

Please sign in to comment.