diff --git a/.gitignore b/.gitignore index 4c2cc91..541ce7e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ concurrent_test_data nimbusdb_temp* benchmark/nimbusdb_temp* gcreport.txt +*.dfile +*.idfile diff --git a/README.md b/README.md index 60e423c..73d8075 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,61 @@ b.Commit() // write all pending writes to disk b.Rollback() // discard all pending writes ``` +#### Watch keys +```go +func watchKeyChange(ch chan nimbusdb.WatcherEvent) { + for event := range ch { + switch event.EventType { + case "CREATE": + // Handle create key event + break + + case "UPDATE": + // Handle update key event + break + + case "DELETE": + // Handle delete key event + break + } + } +} + +func main() { + d, err := nimbusdb.Open(&nimbusdb.Options{Path: "/path/to/data/directory", ShouldWatch: true}) + if err != nil { + // handle error + } + defer d.Close() + defer d.CloseWatch() // optional + + watchChannel, err := d.Watch() + if err != nil { + // handle error + } + + go watchEvents(watchChannel) + + kvPair := &nimbusdb.KeyValuePair{ + Key: []byte("key"), + Value: []byte("value"), + } + setValue, err := d.Set(kvPair) // will trigger an CREATE event + if err != nil { + // handle error + } + + setValue, err := d.Set(kvPair) // will trigger an UPDATE event + if err != nil { + // handle error + } + + err = d.Delete(kvPair.Key) // will trigger an DELETE event + if err != nil { + // handle error + } +} +``` [Progress Board](https://trello.com/b/2eDSLLb3/nimbusdb) | [Streams](https://youtube.com/playlist?list=PLJALjJgNSDVo5veOf2apgMIE1QgN7IEfk) | [godoc](https://pkg.go.dev/github.com/manosriram/nimbusdb) diff --git a/batch.go b/batch.go index 4861368..d680f80 100644 --- a/batch.go +++ b/batch.go @@ -7,6 +7,7 @@ import ( "time" "github.com/manosriram/nimbusdb/utils" + "github.com/segmentio/ksuid" ) var ( @@ -15,6 +16,7 @@ var ( ) type Batch struct { + id ksuid.KSUID db *Db closed bool batchlock sync.Mutex @@ -26,6 +28,7 @@ func (db *Db) NewBatch() (*Batch, error) { b := &Batch{ db: db, closed: false, + id: ksuid.New(), } b.batchlock.Lock() return b, nil @@ -188,16 +191,34 @@ func (b *Batch) Commit() error { if b.closed { return ERROR_BATCH_CLOSED } - var err error + for i := range b.writeQueue { + k := b.writeQueue[i].Key + v := utils.Encode(b.writeQueue[i].Value) + var existingValueForKey []byte + existingValueEntryForKey, err := b.db.Get(k) + if err != nil { + existingValueForKey = nil + } else { + existingValueForKey = existingValueEntryForKey + } + if b.writeQueue[i].Ttl == 0 { - _, err = b.db.Set(b.writeQueue[i].Key, utils.Encode(b.writeQueue[i].Value)) + _, err = b.db.Set(k, v) } else { - _, err = b.db.SetWithTTL(b.writeQueue[i].Key, utils.Encode(b.writeQueue[i].Value), b.writeQueue[i].Ttl) + _, err = b.db.SetWithTTL(k, v, b.writeQueue[i].Ttl) } if err != nil { return err } + + if b.db.opts.ShouldWatch { + if existingValueForKey == nil { + b.db.SendWatchEvent(NewCreateWatcherEvent(k, existingValueForKey, v, &b.id)) + } else { + b.db.SendWatchEvent(NewUpdateWatcherEvent(k, existingValueForKey, v, &b.id)) + } + } } b.writeQueue = nil return nil diff --git a/db.go b/db.go index fc9fc22..60fa6d0 100644 --- a/db.go +++ b/db.go @@ -93,9 +93,11 @@ const ( ) type Options struct { - IsMerge bool - MergeFilePath string - Path string + IsMerge bool + MergeFilePath string + Path string + ShouldWatch bool + WatchQueueSize int } type KeyValuePair struct { @@ -133,9 +135,10 @@ type Db struct { mu sync.RWMutex segments map[string]*Segment lru *expirable.LRU[int64, *Block] + watcher chan WatcherEvent } -func NewDb(dirPath string) *Db { +func NewDb(dirPath string, opts ...*Options) *Db { segments := make(map[string]*Segment, 0) db := &Db{ dirPath: dirPath, @@ -145,6 +148,19 @@ func NewDb(dirPath string) *Db { }, lru: expirable.NewLRU[int64, *Block](LRU_SIZE, nil, LRU_TTL), segments: segments, + opts: &Options{ + ShouldWatch: false, + }, + } + + db.watcher = make(chan WatcherEvent, func() int { + if len(opts) > 0 { + return opts[0].WatchQueueSize + } + return 0 + }()) + if len(opts) > 0 { + db.opts.ShouldWatch = opts[0].ShouldWatch } return db @@ -227,7 +243,6 @@ func (db *Db) setKeyDir(key []byte, kdValue KeyDirValue) (interface{}, error) { } db.removeBlockCache(segment.getBlockNumber()) } - db.keyDir.Set(key, kdValue) db.lastOffset.Store(kdValue.offset + kdValue.size) @@ -471,7 +486,7 @@ func Open(opts *Options) (*Db, error) { dirPath = utils.JoinPaths(home, DefaultDataDir) } - db := NewDb(dirPath) + db := NewDb(dirPath, opts) go db.handleInterrupt() err := os.MkdirAll(dirPath, os.ModePerm) @@ -527,6 +542,7 @@ func (db *Db) Close() error { } } db.closed = true + close(db.watcher) return nil } @@ -592,6 +608,16 @@ func (db *Db) Set(k []byte, v []byte) ([]byte, error) { intKSz := int64(len(k)) intVSz := int64(len(utils.Encode(v))) + var existingValueForKey []byte + if db.opts.ShouldWatch { + existingValueEntryForKey, err := db.Get(k) + if err != nil { + existingValueForKey = nil + } else { + existingValueForKey = existingValueEntryForKey + } + } + newKeyValueEntry := &KeyValueEntry{ deleted: DELETED_FLAG_UNSET_VALUE, offset: db.getLastOffset(), @@ -614,10 +640,19 @@ func (db *Db) Set(k []byte, v []byte) ([]byte, error) { kdValue := NewKeyDirValue(newKeyValueEntry.offset, newKeyValueEntry.size, newKeyValueEntry.tstamp, utils.GetFilenameWithoutExtension(db.activeDataFile)) _, err = db.setKeyDir(k, *kdValue) + if err != nil { return nil, err } + // do not watch if ShouldWatch is set with options + if db.opts.ShouldWatch { + if existingValueForKey == nil { + db.SendWatchEvent(NewCreateWatcherEvent(k, existingValueForKey, v, nil)) + } else { + db.SendWatchEvent(NewUpdateWatcherEvent(k, existingValueForKey, v, nil)) + } + } return v, err } @@ -625,6 +660,16 @@ func (db *Db) SetWithTTL(k []byte, v []byte, ttl time.Duration) (interface{}, er intKSz := int64(len(k)) intVSz := int64(len(utils.Encode(v))) + var existingValueForKey []byte + if db.opts.ShouldWatch { + existingValueEntryForKey, err := db.Get(k) + if err != nil { + existingValueForKey = nil + } else { + existingValueForKey = existingValueEntryForKey + } + } + newKeyValueEntry := &KeyValueEntry{ deleted: DELETED_FLAG_UNSET_VALUE, offset: db.getLastOffset(), @@ -651,6 +696,14 @@ func (db *Db) SetWithTTL(k []byte, v []byte, ttl time.Duration) (interface{}, er return nil, err } + // do not watch if ShouldWatch is set with options + if db.opts.ShouldWatch { + if existingValueForKey == nil { + db.SendWatchEvent(NewCreateWatcherEvent(k, existingValueForKey, v, nil)) + } else { + db.SendWatchEvent(NewUpdateWatcherEvent(k, existingValueForKey, v, nil)) + } + } return v, err } @@ -661,6 +714,9 @@ func (db *Db) Delete(key []byte) error { defer db.mu.Unlock() err := db.deleteKey(key) + if db.opts.ShouldWatch { + db.SendWatchEvent(NewDeleteWatcherEvent(key, nil, nil, nil)) + } return err } diff --git a/db_test.go b/db_test.go index 1cd13bc..f59acfc 100644 --- a/db_test.go +++ b/db_test.go @@ -12,7 +12,8 @@ import ( ) var opts = &Options{ - Path: utils.DbDir(), + Path: utils.DbDir(), + ShouldWatch: false, } const ( diff --git a/examples/b.go b/examples/b.go index cd82460..65a71d2 100644 --- a/examples/b.go +++ b/examples/b.go @@ -3,12 +3,20 @@ package main import ( "fmt" - "golang.org/x/exp/slices" + "github.com/manosriram/nimbusdb" ) func main() { - x := []int{1, 2, 3} + d, _ := nimbusdb.Open(&nimbusdb.Options{ + Path: "./", + ShouldWatch: true, + }) - slices.Delete(x, 0, 1) - fmt.Println(x) + ch, _ := d.NewWatch() + + k := []byte("testkey1") + v := []byte("testvalue1") + d.Set(k, v) + + fmt.Printf("%v\n", <-ch) } diff --git a/examples/db/db.go b/examples/db/db.go index 161b18c..520568f 100644 --- a/examples/db/db.go +++ b/examples/db/db.go @@ -3,6 +3,7 @@ package main import ( "bufio" "fmt" + "log" "os" "strings" @@ -13,9 +14,37 @@ const ( DirPath = "/Users/manosriram/nimbusdb/test_data" ) +func watchKeyChange(ch chan nimbusdb.WatcherEvent) { + for event := range ch { + switch event.EventType { + case "CREATE": + log.Printf("got event: %s for Key %s with Value %s\n", event.EventType, event.Key, event.NewValue) + break + case "UPDATE": + log.Printf("got event: %s for Key %s with OldValue %s and NewValue %s\n", event.EventType, event.Key, event.OldValue, event.NewValue) + break + case "DELETE": + log.Printf("got event: %s for Key %s\n", event.EventType, event.Key) + break + } + } +} + func main() { - d, _ := nimbusdb.Open(&nimbusdb.Options{Path: DirPath}) + d, _ := nimbusdb.Open(&nimbusdb.Options{Path: DirPath, WatchQueueSize: 10}) defer d.Close() + + ch, _ := d.NewWatch() + defer d.CloseWatch() + + // b, _ := d.NewBatch() + // d.Set([]byte("asdlal"), []byte("asdlkjas"), &nimbusdb.Options{ShouldWatch: true}) + + // b.Set([]byte("asdlal"), []byte("asdlkjas")) + // b.Commit() + + go watchKeyChange(ch) + for { reader := bufio.NewReader(os.Stdin) fmt.Printf("> ") diff --git a/go.mod b/go.mod index 65919c9..d4ed452 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,16 @@ module github.com/manosriram/nimbusdb go 1.20 -require github.com/stretchr/testify v1.8.4 +require ( + github.com/google/btree v1.1.2 + github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/stretchr/testify v1.8.4 + golang.org/x/exp v0.0.0-20240119083558-1b970713d09a +) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/btree v1.1.2 // indirect - github.com/google/uuid v1.4.0 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/segmentio/ksuid v1.0.4 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 69d270a..8d5bd17 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,26 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/matoous/go-nanoid v1.5.0 h1:VRorl6uCngneC4oUQqOYtO3S0H5QKFtKuKycFG3euek= +github.com/matoous/go-nanoid v1.5.0/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= +github.com/matoous/go-nanoid/v2 v2.0.0 h1:d19kur2QuLeHmJBkvYkFdhFBzLoo1XVm2GgTpL+9Tj0= +github.com/matoous/go-nanoid/v2 v2.0.0/go.mod h1:FtS4aGPVfEkxKxhdWPAspZpZSh1cOjtM7Ej/So3hR0g= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= +github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/watch.go b/watch.go new file mode 100644 index 0000000..545ee78 --- /dev/null +++ b/watch.go @@ -0,0 +1,89 @@ +package nimbusdb + +import ( + "time" + + "github.com/segmentio/ksuid" +) + +type EventType string + +const ( + Create EventType = "CREATE" + Update EventType = "UPDATE" + Delete EventType = "DELETE" +) + +type WatcherEvent struct { + EventType EventType + Key []byte + OldValue []byte + NewValue []byte + EventTimestamp time.Time + BatchId *ksuid.KSUID +} + +func (db *Db) NewWatch() (chan WatcherEvent, error) { + if db.closed { + return nil, ERROR_DB_CLOSED + } + return db.watcher, nil +} + +func (db *Db) CloseWatch() error { + if db.closed { + return ERROR_DB_CLOSED + } + close(db.watcher) + return nil +} + +func NewCreateWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent { + w := WatcherEvent{ + EventType: Create, + Key: key, + NewValue: newValue, + EventTimestamp: time.Now(), + BatchId: batchId, + } + if oldValue != nil { + w.OldValue = oldValue + } + return w +} + +func NewUpdateWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent { + w := WatcherEvent{ + EventType: Update, + Key: key, + NewValue: newValue, + EventTimestamp: time.Now(), + BatchId: batchId, + } + if oldValue != nil { + w.OldValue = oldValue + } + return w +} + +func NewDeleteWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent { + w := WatcherEvent{ + EventType: Delete, + Key: key, + NewValue: newValue, + EventTimestamp: time.Now(), + BatchId: batchId, + } + if oldValue != nil { + w.OldValue = oldValue + } + return w +} + +func (db *Db) SendWatchEvent(w WatcherEvent) error { + if db.closed { + return ERROR_DB_CLOSED + } + db.watcher <- w + return nil +} diff --git a/watch_test.go b/watch_test.go new file mode 100644 index 0000000..ec7966a --- /dev/null +++ b/watch_test.go @@ -0,0 +1,100 @@ +package nimbusdb + +import ( + "os" + "testing" + + "github.com/manosriram/nimbusdb/utils" + "github.com/stretchr/testify/assert" +) + +func Test_Watch_Set(t *testing.T) { + d, err := Open(&Options{ + Path: utils.DbDir(), + WatchQueueSize: 10, + ShouldWatch: true, + }) + defer d.Close() + defer d.CloseWatch() + assert.Equal(t, err, nil) + assert.NotEqual(t, d, nil) + + ch, err := d.NewWatch() + assert.Nil(t, err) + + k := []byte("testkey1") + v := []byte("testvalue1") + _, err = d.Set(k, v) + assert.Nil(t, err) + + event := <-ch + assert.Equal(t, Create, event.EventType) + + t.Cleanup(func() { + os.RemoveAll(opts.Path) + }) +} + +func Test_Watch_Update(t *testing.T) { + d, err := Open(&Options{ + Path: utils.DbDir(), + WatchQueueSize: 10, + ShouldWatch: true, + }) + defer d.Close() + defer d.CloseWatch() + assert.Equal(t, err, nil) + assert.NotEqual(t, d, nil) + + ch, err := d.NewWatch() + assert.Nil(t, err) + + k := []byte("testkey1") + v := []byte("testvalue1") + _, err = d.Set(k, v) + _, err = d.Set(k, v) + assert.Nil(t, err) + + createEvent := <-ch + assert.Equal(t, Create, createEvent.EventType) + + updateEvent := <-ch + assert.Equal(t, Update, updateEvent.EventType) + + t.Cleanup(func() { + os.RemoveAll(opts.Path) + }) +} + +func Test_Watch_Delete(t *testing.T) { + d, err := Open(&Options{ + Path: utils.DbDir(), + WatchQueueSize: 10, + ShouldWatch: true, + }) + defer d.Close() + defer d.CloseWatch() + assert.Equal(t, err, nil) + assert.NotEqual(t, d, nil) + + ch, err := d.NewWatch() + assert.Nil(t, err) + + k := []byte("testkey1") + v := []byte("testvalue1") + _, err = d.Set(k, v) + assert.Nil(t, err) + + err = d.Delete(k) + assert.Nil(t, err) + + createEvent := <-ch + assert.Equal(t, Create, createEvent.EventType) + + deleteEvent := <-ch + assert.Equal(t, Delete, deleteEvent.EventType) + + t.Cleanup(func() { + os.RemoveAll(opts.Path) + }) +}