Skip to content

Commit

Permalink
Merge pull request #12 from manosriram/feat/watch
Browse files Browse the repository at this point in the history
Feat: Watcher
  • Loading branch information
manosriram authored Jan 24, 2024
2 parents c3e3b62 + a2364f0 commit 5eb57f1
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 21 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ concurrent_test_data
nimbusdb_temp*
benchmark/nimbusdb_temp*
gcreport.txt
*.dfile
*.idfile
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,61 @@ b.Commit() // write all pending writes to disk
b.Rollback() // discard all pending writes
```

#### Watch keys
```go
func watchKeyChange(ch chan nimbusdb.WatcherEvent) {
for event := range ch {
switch event.EventType {
case "CREATE":
// Handle create key event
break

case "UPDATE":
// Handle update key event
break

case "DELETE":
// Handle delete key event
break
}
}
}

func main() {
d, err := nimbusdb.Open(&nimbusdb.Options{Path: "/path/to/data/directory", ShouldWatch: true})
if err != nil {
// handle error
}
defer d.Close()
defer d.CloseWatch() // optional

watchChannel, err := d.Watch()
if err != nil {
// handle error
}

go watchEvents(watchChannel)

kvPair := &nimbusdb.KeyValuePair{
Key: []byte("key"),
Value: []byte("value"),
}
setValue, err := d.Set(kvPair) // will trigger an CREATE event
if err != nil {
// handle error
}

setValue, err := d.Set(kvPair) // will trigger an UPDATE event
if err != nil {
// handle error
}

err = d.Delete(kvPair.Key) // will trigger an DELETE event
if err != nil {
// handle error
}
}
```

[Progress Board](https://trello.com/b/2eDSLLb3/nimbusdb) | [Streams](https://youtube.com/playlist?list=PLJALjJgNSDVo5veOf2apgMIE1QgN7IEfk) | [godoc](https://pkg.go.dev/github.com/manosriram/nimbusdb)

Expand Down
27 changes: 24 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/manosriram/nimbusdb/utils"
"github.com/segmentio/ksuid"
)

var (
Expand All @@ -15,6 +16,7 @@ var (
)

type Batch struct {
id ksuid.KSUID
db *Db
closed bool
batchlock sync.Mutex
Expand All @@ -26,6 +28,7 @@ func (db *Db) NewBatch() (*Batch, error) {
b := &Batch{
db: db,
closed: false,
id: ksuid.New(),
}
b.batchlock.Lock()
return b, nil
Expand Down Expand Up @@ -188,16 +191,34 @@ func (b *Batch) Commit() error {
if b.closed {
return ERROR_BATCH_CLOSED
}
var err error

for i := range b.writeQueue {
k := b.writeQueue[i].Key
v := utils.Encode(b.writeQueue[i].Value)
var existingValueForKey []byte
existingValueEntryForKey, err := b.db.Get(k)
if err != nil {
existingValueForKey = nil
} else {
existingValueForKey = existingValueEntryForKey
}

if b.writeQueue[i].Ttl == 0 {
_, err = b.db.Set(b.writeQueue[i].Key, utils.Encode(b.writeQueue[i].Value))
_, err = b.db.Set(k, v)
} else {
_, err = b.db.SetWithTTL(b.writeQueue[i].Key, utils.Encode(b.writeQueue[i].Value), b.writeQueue[i].Ttl)
_, err = b.db.SetWithTTL(k, v, b.writeQueue[i].Ttl)
}
if err != nil {
return err
}

if b.db.opts.ShouldWatch {
if existingValueForKey == nil {
b.db.SendWatchEvent(NewCreateWatcherEvent(k, existingValueForKey, v, &b.id))
} else {
b.db.SendWatchEvent(NewUpdateWatcherEvent(k, existingValueForKey, v, &b.id))
}
}
}
b.writeQueue = nil
return nil
Expand Down
68 changes: 62 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ const (
)

type Options struct {
IsMerge bool
MergeFilePath string
Path string
IsMerge bool
MergeFilePath string
Path string
ShouldWatch bool
WatchQueueSize int
}

type KeyValuePair struct {
Expand Down Expand Up @@ -133,9 +135,10 @@ type Db struct {
mu sync.RWMutex
segments map[string]*Segment
lru *expirable.LRU[int64, *Block]
watcher chan WatcherEvent
}

func NewDb(dirPath string) *Db {
func NewDb(dirPath string, opts ...*Options) *Db {
segments := make(map[string]*Segment, 0)
db := &Db{
dirPath: dirPath,
Expand All @@ -145,6 +148,19 @@ func NewDb(dirPath string) *Db {
},
lru: expirable.NewLRU[int64, *Block](LRU_SIZE, nil, LRU_TTL),
segments: segments,
opts: &Options{
ShouldWatch: false,
},
}

db.watcher = make(chan WatcherEvent, func() int {
if len(opts) > 0 {
return opts[0].WatchQueueSize
}
return 0
}())
if len(opts) > 0 {
db.opts.ShouldWatch = opts[0].ShouldWatch
}

return db
Expand Down Expand Up @@ -227,7 +243,6 @@ func (db *Db) setKeyDir(key []byte, kdValue KeyDirValue) (interface{}, error) {
}
db.removeBlockCache(segment.getBlockNumber())
}

db.keyDir.Set(key, kdValue)
db.lastOffset.Store(kdValue.offset + kdValue.size)

Expand Down Expand Up @@ -471,7 +486,7 @@ func Open(opts *Options) (*Db, error) {

dirPath = utils.JoinPaths(home, DefaultDataDir)
}
db := NewDb(dirPath)
db := NewDb(dirPath, opts)
go db.handleInterrupt()

err := os.MkdirAll(dirPath, os.ModePerm)
Expand Down Expand Up @@ -527,6 +542,7 @@ func (db *Db) Close() error {
}
}
db.closed = true
close(db.watcher)
return nil
}

Expand Down Expand Up @@ -592,6 +608,16 @@ func (db *Db) Set(k []byte, v []byte) ([]byte, error) {
intKSz := int64(len(k))
intVSz := int64(len(utils.Encode(v)))

var existingValueForKey []byte
if db.opts.ShouldWatch {
existingValueEntryForKey, err := db.Get(k)
if err != nil {
existingValueForKey = nil
} else {
existingValueForKey = existingValueEntryForKey
}
}

newKeyValueEntry := &KeyValueEntry{
deleted: DELETED_FLAG_UNSET_VALUE,
offset: db.getLastOffset(),
Expand All @@ -614,17 +640,36 @@ func (db *Db) Set(k []byte, v []byte) ([]byte, error) {

kdValue := NewKeyDirValue(newKeyValueEntry.offset, newKeyValueEntry.size, newKeyValueEntry.tstamp, utils.GetFilenameWithoutExtension(db.activeDataFile))
_, err = db.setKeyDir(k, *kdValue)

if err != nil {
return nil, err
}

// do not watch if ShouldWatch is set with options
if db.opts.ShouldWatch {
if existingValueForKey == nil {
db.SendWatchEvent(NewCreateWatcherEvent(k, existingValueForKey, v, nil))
} else {
db.SendWatchEvent(NewUpdateWatcherEvent(k, existingValueForKey, v, nil))
}
}
return v, err
}

func (db *Db) SetWithTTL(k []byte, v []byte, ttl time.Duration) (interface{}, error) {
intKSz := int64(len(k))
intVSz := int64(len(utils.Encode(v)))

var existingValueForKey []byte
if db.opts.ShouldWatch {
existingValueEntryForKey, err := db.Get(k)
if err != nil {
existingValueForKey = nil
} else {
existingValueForKey = existingValueEntryForKey
}
}

newKeyValueEntry := &KeyValueEntry{
deleted: DELETED_FLAG_UNSET_VALUE,
offset: db.getLastOffset(),
Expand All @@ -651,6 +696,14 @@ func (db *Db) SetWithTTL(k []byte, v []byte, ttl time.Duration) (interface{}, er
return nil, err
}

// do not watch if ShouldWatch is set with options
if db.opts.ShouldWatch {
if existingValueForKey == nil {
db.SendWatchEvent(NewCreateWatcherEvent(k, existingValueForKey, v, nil))
} else {
db.SendWatchEvent(NewUpdateWatcherEvent(k, existingValueForKey, v, nil))
}
}
return v, err
}

Expand All @@ -661,6 +714,9 @@ func (db *Db) Delete(key []byte) error {
defer db.mu.Unlock()

err := db.deleteKey(key)
if db.opts.ShouldWatch {
db.SendWatchEvent(NewDeleteWatcherEvent(key, nil, nil, nil))
}
return err
}

Expand Down
3 changes: 2 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

var opts = &Options{
Path: utils.DbDir(),
Path: utils.DbDir(),
ShouldWatch: false,
}

const (
Expand Down
16 changes: 12 additions & 4 deletions examples/b.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ package main
import (
"fmt"

"golang.org/x/exp/slices"
"github.com/manosriram/nimbusdb"
)

func main() {
x := []int{1, 2, 3}
d, _ := nimbusdb.Open(&nimbusdb.Options{
Path: "./",
ShouldWatch: true,
})

slices.Delete(x, 0, 1)
fmt.Println(x)
ch, _ := d.NewWatch()

k := []byte("testkey1")
v := []byte("testvalue1")
d.Set(k, v)

fmt.Printf("%v\n", <-ch)
}
31 changes: 30 additions & 1 deletion examples/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"fmt"
"log"
"os"
"strings"

Expand All @@ -13,9 +14,37 @@ const (
DirPath = "/Users/manosriram/nimbusdb/test_data"
)

func watchKeyChange(ch chan nimbusdb.WatcherEvent) {
for event := range ch {
switch event.EventType {
case "CREATE":
log.Printf("got event: %s for Key %s with Value %s\n", event.EventType, event.Key, event.NewValue)
break
case "UPDATE":
log.Printf("got event: %s for Key %s with OldValue %s and NewValue %s\n", event.EventType, event.Key, event.OldValue, event.NewValue)
break
case "DELETE":
log.Printf("got event: %s for Key %s\n", event.EventType, event.Key)
break
}
}
}

func main() {
d, _ := nimbusdb.Open(&nimbusdb.Options{Path: DirPath})
d, _ := nimbusdb.Open(&nimbusdb.Options{Path: DirPath, WatchQueueSize: 10})
defer d.Close()

ch, _ := d.NewWatch()
defer d.CloseWatch()

// b, _ := d.NewBatch()
// d.Set([]byte("asdlal"), []byte("asdlkjas"), &nimbusdb.Options{ShouldWatch: true})

// b.Set([]byte("asdlal"), []byte("asdlkjas"))
// b.Commit()

go watchKeyChange(ch)

for {
reader := bufio.NewReader(os.Stdin)
fmt.Printf("> ")
Expand Down
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ module github.com/manosriram/nimbusdb

go 1.20

require github.com/stretchr/testify v1.8.4
require (
github.com/google/btree v1.1.2
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 5eb57f1

Please sign in to comment.