Skip to content

Commit

Permalink
feat(merge): add a initial buggy merge feature
Browse files Browse the repository at this point in the history
- option to do a Merge: removes all expired keys from DirPath
- TODO: 1 .swp file remains after the merge process
  • Loading branch information
manosriram committed May 13, 2024
1 parent f357eeb commit 9ef6000
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 164 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ build:

clean:
rm -rf ./nimbusdb_temp* benchmark/nimbusdb_temp*
rm -rf ~/nimbusdb/test_data
rm -rf ./dd

test:
go test -v -failfast
Expand Down
16 changes: 0 additions & 16 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,12 @@ package nimbusdb

import (
"bytes"
"errors"
"sync"
"time"

"github.com/manosriram/nimbusdb/utils"
"github.com/segmentio/ksuid"
)

var (
ERROR_BATCH_CLOSED = errors.New("batch is closed")
ERROR_CANNOT_CLOSE_CLOSED_BATCH = errors.New("cannot close closed batch")
)

type Batch struct {
id ksuid.KSUID
db *Db
closed bool
batchlock sync.Mutex
mu sync.RWMutex
writeQueue []*KeyValuePair
}

func (db *Db) NewBatch() (*Batch, error) {
b := &Batch{
db: db,
Expand Down
247 changes: 122 additions & 125 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nimbusdb

import (
"bytes"
"errors"
"fmt"
"io/fs"
"log"
Expand All @@ -22,98 +21,6 @@ import (
utils "github.com/manosriram/nimbusdb/utils"
)

const (
_ = iota
KB int64 = 1 << (10 * iota)
MB
GB
TB
PB
EB
)

const (
ActiveKeyValueEntryDatafileSuffix = ".dfile"
KeyValueEntryHintfileSuffix = ".hfile"
InactiveKeyValueEntryDataFileSuffix = ".idfile"
TempDataFilePattern = "*.dfile"
TempInactiveDataFilePattern = "*.idfile"
DefaultDataDir = "nimbusdb"

DatafileThreshold = 1 * MB
BlockSize = 32 * KB
)

const (
CrcSize int64 = 5
DeleteFlagSize = 1
TstampSize = 10
KeySizeSize = 10
ValueSizeSize = 10
StaticChunkSize = CrcSize + DeleteFlagSize + TstampSize + KeySizeSize + ValueSizeSize

CrcOffset int64 = 5
DeleteFlagOffset = 6
TstampOffset = 16
KeySizeOffset = 26
ValueSizeOffset = 36

BTreeDegree int = 10
)

const (
TotalStaticChunkSize int64 = TstampOffset + KeySizeOffset + ValueSizeOffset + DeleteFlagOffset + CrcOffset + StaticChunkSize
)

var (
ERROR_KEY_NOT_FOUND = errors.New("key expired or does not exist")
ERROR_NO_ACTIVE_FILE_OPENED = errors.New("no file opened for writing")
ERROR_OFFSET_EXCEEDED_FILE_SIZE = errors.New("offset exceeded file size")
ERROR_CANNOT_READ_FILE = errors.New("error reading file")
ERROR_KEY_VALUE_SIZE_EXCEEDED = errors.New(fmt.Sprintf("exceeded limit of %d bytes", BlockSize))
ERROR_CRC_DOES_NOT_MATCH = errors.New("crc does not match. corrupted datafile")
ERROR_DB_CLOSED = errors.New("database is closed")
)

const (
KEY_EXPIRES_IN_DEFAULT = 168 * time.Hour // 1 week

DELETED_FLAG_BYTE_VALUE = byte(0x31)
DELETED_FLAG_SET_VALUE = byte(0x01)
DELETED_FLAG_UNSET_VALUE = byte(0x00)

LRU_SIZE = 50
LRU_TTL = 24 * time.Hour

EXIT_NOT_OK = 0
EXIT_OK = 1

INITIAL_SEGMENT_OFFSET = 0
INITIAL_KEY_VALUE_ENTRY_OFFSET = 0
)

type Options struct {
IsMerge bool
MergeFilePath string
Path string
ShouldWatch bool
WatchQueueSize int
}

type KeyValuePair struct {
Key []byte
Value interface{}
Ttl time.Duration
}

type KeyDirValue struct {
offset int64
blockNumber int64
size int64
path string
tstamp int64
}

func NewKeyDirValue(offset, size, tstamp int64, path string) *KeyDirValue {
return &KeyDirValue{
offset: offset,
Expand All @@ -124,18 +31,22 @@ func NewKeyDirValue(offset, size, tstamp int64, path string) *KeyDirValue {
}

type Db struct {
dirPath string
closed bool
dataFilePath string
activeDataFile string
activeDataFilePointer *os.File
keyDir *BTree
opts *Options
lastOffset atomic.Int64
mu sync.RWMutex
segments map[string]*Segment
lru *expirable.LRU[int64, *Block]
watcher chan WatcherEvent
dirPath string
closed bool
dataFilePath string

activeDataFile string
activeDataFilePointer *os.File
mergeActiveDataFile string
mergeActiveDataFilePointer *os.File

keyDir *BTree
opts *Options
lastOffset atomic.Int64
mu sync.RWMutex
segments map[string]*Segment
lru *expirable.LRU[int64, *Block]
watcher chan WatcherEvent
}

func NewDb(dirPath string, opts ...*Options) *Db {
Expand Down Expand Up @@ -365,6 +276,61 @@ func (db *Db) getActiveFileKeyValueEntries(filePath string) ([]*KeyValueEntry, e
return keyValueEntries, nil
}

func (db *Db) getActiveKeyValueEntriesInFile(filePath string) ([]*ActiveKeyValueOffset, []byte, error) {
data, err := utils.ReadFile(filePath) // TODO: read in blocks
if err != nil {
return nil, nil, err
}

var validKeys []*ActiveKeyValueOffset
var previousOffset int64 = 0
var offset int64 = 0
for offset < int64(len(data)) {
keyValueEntry, err := getKeyValueEntryFromOffsetViaData(offset, data)
if err != nil && err != ERROR_KEY_NOT_FOUND {
return nil, nil, err
} else if err != nil && err == ERROR_KEY_NOT_FOUND {
previousOffset = offset
if int(offset+StaticChunkSize) > len(data) {
offset += keyValueEntry.size
break
}

offset += keyValueEntry.size
continue
}

keyValueEntry.fileID = utils.GetFilenameWithoutExtension(filePath)
hasTimestampExpired := utils.HasTimestampExpired(keyValueEntry.tstamp)
if !hasTimestampExpired {
validKeys = append(validKeys, &ActiveKeyValueOffset{
Startoffset: previousOffset,
Endoffset: offset,
})
// fileName := utils.GetFilenameWithoutExtension(filePath)
// kdValue := KeyDirValue{
// offset: keyValueEntry.offset,
// size: keyValueEntry.size,
// path: fileName,
// tstamp: keyValueEntry.tstamp,
// }
// _, err := db.setKeyDir(keyValueEntry.k, kdValue) // TODO: use Set here?
// if err != nil {
// return err
// }
}

previousOffset = offset
if int(offset+StaticChunkSize) > len(data) {
offset += keyValueEntry.size
break
}

offset += keyValueEntry.size
}
return validKeys, data, nil
}

func (db *Db) parseActiveKeyValueEntryFile(filePath string) error {
data, err := utils.ReadFile(filePath) // TODO: read in blocks
if err != nil {
Expand Down Expand Up @@ -553,7 +519,12 @@ func (db *Db) All() []*KeyValuePair {
func (db *Db) limitDatafileToThreshold(newKeyValueEntry *KeyValueEntry, opts *Options) {
var sz os.FileInfo
var err error
f, err := db.getActiveDataFilePointer()
var f *os.File
if opts.IsMerge {
f, err = os.Open(opts.CurrentMergeFilePath)
} else {
f, err = db.getActiveDataFilePointer()
}
sz, err = f.Stat()
if err != nil {
log.Fatal(err)
Expand All @@ -563,7 +534,7 @@ func (db *Db) limitDatafileToThreshold(newKeyValueEntry *KeyValueEntry, opts *Op
if size+newKeyValueEntry.size > DatafileThreshold {
if opts.IsMerge {
db.createInactiveDatafile(db.dirPath)
os.Remove(opts.MergeFilePath)
os.Remove(opts.CurrentMergeFilePath)
} else {
db.createActiveDatafile(db.dirPath)
newKeyValueEntry.offset = INITIAL_KEY_VALUE_ENTRY_OFFSET
Expand Down Expand Up @@ -720,42 +691,68 @@ func (db *Db) Delete(k []byte) ([]byte, error) {
return k, err
}

func (db *Db) initMergeDataFilePointer() {
file, err := os.CreateTemp(db.dirPath, SwapFilePattern)
if err != nil {
fmt.Println("error init merge ", err.Error())
}
db.mergeActiveDataFilePointer = file
db.mergeActiveDataFile = file.Name()
}

func (db *Db) walk(s string, file fs.DirEntry, err error) error {

if db.mergeActiveDataFilePointer == nil {
db.initMergeDataFilePointer()
}

if path.Ext(file.Name()) != InactiveKeyValueEntryDataFileSuffix {
return nil
}

path := utils.JoinPaths(db.dirPath, file.Name())
db.setActiveDataFile(path)
db.setLastOffset(INITIAL_SEGMENT_OFFSET)
oldPath := utils.JoinPaths(db.dirPath, file.Name())
newPath := utils.GetSwapFilePath(db.dirPath, db.mergeActiveDataFilePointer.Name())
keys, d, err := db.getActiveKeyValueEntriesInFile(oldPath)
if err != nil {
return err
}

keyValueEntries, _ := db.getActiveFileKeyValueEntries(path)
if len(keyValueEntries) == 0 {
err = os.Remove(path)
if err != nil {
return err
}
if len(keys) == 0 {
os.Remove(oldPath)
os.Remove(newPath)
return nil
}

for _, keyValueEntry := range keyValueEntries {
db.limitDatafileToThreshold(keyValueEntry, &Options{
IsMerge: true,
MergeFilePath: path,
})
err := db.writeKeyValueEntry(keyValueEntry)
if err != nil {
return err
info, err := db.mergeActiveDataFilePointer.Stat()
if err != nil {
return err
}

for _, z := range keys {
if (z.Endoffset-z.Startoffset)+info.Size() > DatafileThreshold {
fmt.Println("data threshold reached, moving swp to idfile")
db.mergeActiveDataFilePointer.Close()
swapFilename := strings.Split(newPath, ".")[0]
err = os.Rename(newPath, fmt.Sprintf("%s.idfile", swapFilename))
if err != nil {
return err
}
db.initMergeDataFilePointer()
fmt.Println("creating new merge file ", db.mergeActiveDataFile)
}
db.mergeActiveDataFilePointer.WriteAt(d[z.Startoffset:z.Endoffset], z.Startoffset)
}
return nil
}

// Syncs the database. Will remove all expired/deleted keys from disk.
// Since items are removed, disk usage will reduce.
func (db *Db) Sync() error {
// db.initMergeDataFilePointer()

err := filepath.WalkDir(db.dirPath, db.walk)
if err != nil {
log.Println(err)
fmt.Println(err)
return err
}

Expand Down
21 changes: 21 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,27 @@ func Test_InMemory_Delete(t *testing.T) {
})
}

func Test_StressSetWithTTL(t *testing.T) {
d, err := Open(opts)
defer d.Close()
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

for i := 0; i < 500000; i++ {
key := []byte(utils.GetTestKey(i))
value := []byte("testvalue")
_, err := d.SetWithTTL(key, value, time.Second*10)
assert.Nil(t, err)
}

for i := 0; i < 500000; i++ {
key := []byte(utils.GetTestKey(i))
value := []byte("testvalue")
_, err := d.SetWithTTL(key, value, time.Second*25)
assert.Nil(t, err)
}
}

func Test_StressSet(t *testing.T) {
d, err := Open(opts)
defer d.Close()
Expand Down
3 changes: 2 additions & 1 deletion examples/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

const (
DirPath = "/Users/manosriram/nimbusdb/test_data"
// DirPath = "/Users/manosriram/nimbusdb/test_data"
DirPath = "./dd/"
)

func watchKeyChange(ch chan nimbusdb.WatcherEvent) {
Expand Down
Loading

0 comments on commit 9ef6000

Please sign in to comment.