Skip to content

Commit

Permalink
add functions to get and set merge file pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
manosriram committed May 13, 2024
1 parent 9ef6000 commit 5bd3746
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
39 changes: 29 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Db struct {
activeDataFile string
activeDataFilePointer *os.File
mergeActiveDataFile string
mergeActiveDataFilePointer *os.File
activeMergeDataFilePointer *os.File

keyDir *BTree
opts *Options
Expand Down Expand Up @@ -98,13 +98,27 @@ func (db *Db) getLastOffset() int64 {
return db.lastOffset.Load()
}

func (db *Db) getActiveMergeDataFilePointer() (*os.File, error) {
if db.activeMergeDataFilePointer == nil {
return nil, ERROR_NO_ACTIVE_MERGE_FILE_OPENED
}
return db.activeMergeDataFilePointer, nil
}

func (db *Db) getActiveDataFilePointer() (*os.File, error) {
if db.activeDataFilePointer == nil {
return nil, ERROR_NO_ACTIVE_FILE_OPENED
}
return db.activeDataFilePointer, nil
}

func (db *Db) closeActiveMergeDataFilePointer() error {
if db.activeMergeDataFilePointer != nil {
return db.activeMergeDataFilePointer.Close()
}
return nil
}

func (db *Db) closeActiveDataFilePointer() error {
if db.activeDataFilePointer != nil {
return db.activeDataFilePointer.Close()
Expand Down Expand Up @@ -696,22 +710,26 @@ func (db *Db) initMergeDataFilePointer() {
if err != nil {
fmt.Println("error init merge ", err.Error())
}
db.mergeActiveDataFilePointer = file
db.activeMergeDataFilePointer = file
db.mergeActiveDataFile = file.Name()
}

func (db *Db) walk(s string, file fs.DirEntry, err error) error {

if db.mergeActiveDataFilePointer == nil {
if db.activeMergeDataFilePointer == nil {
db.initMergeDataFilePointer()
}
activeMergeFilePointer, err := db.getActiveMergeDataFilePointer()
if err != nil {
return err
}

if path.Ext(file.Name()) != InactiveKeyValueEntryDataFileSuffix {
return nil
}

oldPath := utils.JoinPaths(db.dirPath, file.Name())
newPath := utils.GetSwapFilePath(db.dirPath, db.mergeActiveDataFilePointer.Name())
newPath := utils.GetSwapFilePath(db.dirPath, db.activeMergeDataFilePointer.Name())
keys, d, err := db.getActiveKeyValueEntriesInFile(oldPath)
if err != nil {
return err
Expand All @@ -723,15 +741,15 @@ func (db *Db) walk(s string, file fs.DirEntry, err error) error {
return nil
}

info, err := db.mergeActiveDataFilePointer.Stat()
info, err := activeMergeFilePointer.Stat()
if err != nil {
return err
}

for _, z := range keys {
if (z.Endoffset-z.Startoffset)+info.Size() > DatafileThreshold {
fmt.Println("data threshold reached, moving swp to idfile")
db.mergeActiveDataFilePointer.Close()
db.closeActiveMergeDataFilePointer()
swapFilename := strings.Split(newPath, ".")[0]
err = os.Rename(newPath, fmt.Sprintf("%s.idfile", swapFilename))
if err != nil {
Expand All @@ -740,16 +758,17 @@ func (db *Db) walk(s string, file fs.DirEntry, err error) error {
db.initMergeDataFilePointer()
fmt.Println("creating new merge file ", db.mergeActiveDataFile)
}
db.mergeActiveDataFilePointer.WriteAt(d[z.Startoffset:z.Endoffset], z.Startoffset)
activeMergeFilePointer.WriteAt(d[z.Startoffset:z.Endoffset], z.Startoffset)
}
swapFilename := strings.Split(newPath, ".")[0]
err = os.Rename(newPath, fmt.Sprintf("%s.idfile", swapFilename))
return nil
}

// Syncs the database. Will remove all expired/deleted keys from disk.
// Since items are removed, disk usage will reduce.
func (db *Db) Sync() error {
// db.initMergeDataFilePointer()

func (db *Db) Merge() error {
defer db.closeActiveMergeDataFilePointer()
err := filepath.WalkDir(db.dirPath, db.walk)
if err != nil {
fmt.Println(err)
Expand Down
2 changes: 1 addition & 1 deletion examples/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {
}
fmt.Println(string(z))
case "sync":
d.Sync()
d.Merge()
case "keyreader":
prefix := ""
d.KeyReader(prefix, func(k []byte) {
Expand Down
15 changes: 8 additions & 7 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ const (
)

var (
ERROR_KEY_NOT_FOUND = errors.New("key expired or does not exist")
ERROR_NO_ACTIVE_FILE_OPENED = errors.New("no file opened for writing")
ERROR_OFFSET_EXCEEDED_FILE_SIZE = errors.New("offset exceeded file size")
ERROR_CANNOT_READ_FILE = errors.New("error reading file")
ERROR_KEY_VALUE_SIZE_EXCEEDED = errors.New(fmt.Sprintf("exceeded limit of %d bytes", BlockSize))
ERROR_CRC_DOES_NOT_MATCH = errors.New("crc does not match. corrupted datafile")
ERROR_DB_CLOSED = errors.New("database is closed")
ERROR_KEY_NOT_FOUND = errors.New("key expired or does not exist")
ERROR_NO_ACTIVE_FILE_OPENED = errors.New("no file opened for writing")
ERROR_NO_ACTIVE_MERGE_FILE_OPENED = errors.New("no merge file opened for writing")
ERROR_OFFSET_EXCEEDED_FILE_SIZE = errors.New("offset exceeded file size")
ERROR_CANNOT_READ_FILE = errors.New("error reading file")
ERROR_KEY_VALUE_SIZE_EXCEEDED = errors.New(fmt.Sprintf("exceeded limit of %d bytes", BlockSize))
ERROR_CRC_DOES_NOT_MATCH = errors.New("crc does not match. corrupted datafile")
ERROR_DB_CLOSED = errors.New("database is closed")
)

var (
Expand Down

0 comments on commit 5bd3746

Please sign in to comment.