Skip to content

Commit

Permalink
Close active datafile before making a new active datafile, other mino…
Browse files Browse the repository at this point in the history
…r fixes
  • Loading branch information
manosriram committed Dec 3, 2023
1 parent 1f505af commit 821b1cb
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ concurrent_test_data
.DS_Store
.vscode
tests/nimbusdb*
benchmark/nimbusdb*
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ build:
go build -v

clean:
rm -rf tests/nimbusdb*

test_clean:
rm -rf tests/nimbusdb*
rm -rf tests/nimbusdb* benchmark/nimbusdb*

test:
go test ./tests -v
Expand Down
1 change: 1 addition & 0 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func BenchmarkGetSet(b *testing.B) {
Path: utils.DbDir(),
}
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
if err != nil {
log.Fatal(err)
}
Expand Down
48 changes: 30 additions & 18 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ const (
)

const (
KEY_EXPIRES_IN_DEFAULT = 24 * time.Hour
KEY_NOT_FOUND = "key expired or does not exist"
KEY_EXPIRES_IN_DEFAULT = 24 * time.Hour
KEY_NOT_FOUND = "key expired or does not exist"
NO_ACTIVE_FILE_OPENED = "no file opened for writing"
OFFSET_EXCEEDED_FILE_SIZE = "offset exceeded file size"
)

type Segment struct {
Expand All @@ -64,12 +66,6 @@ type Segment struct {
v []byte
}

type Options struct {
IsMerge bool
MergeFilePath string
Path string
}

func (s *Segment) BlockSize() int {
return BlockSize + len(s.k) + len(s.v)
}
Expand Down Expand Up @@ -97,6 +93,12 @@ func (s *Segment) ToByte() []byte {
return segmentInBytes
}

type Options struct {
IsMerge bool
MergeFilePath string
Path string
}

type KeyValuePair struct {
Key []byte
Value interface{}
Expand All @@ -121,7 +123,6 @@ type Db struct {
opts *Options
}

// TODO: use dirPath here
func NewDb(dirPath string) *Db {
keyDir := make(map[string]KeyDirValue, 0)
db := &Db{
Expand All @@ -136,11 +137,19 @@ func (db *Db) LastOffset() int64 {
return db.lastOffset
}

func (db *Db) getActiveDataFilePointer() *os.File {
return db.activeDataFilePointer
func (db *Db) getActiveDataFilePointer() (*os.File, error) {
if db.activeDataFilePointer == nil {
return nil, errors.New(NO_ACTIVE_FILE_OPENED)
}
return db.activeDataFilePointer, nil
}

func (db *Db) setActiveDataFile(activeDataFile string) error {
err := db.Close(&Options{}) // close existing active datafile
if err != nil {
return err
}

f, err := os.OpenFile(activeDataFile, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return err
Expand Down Expand Up @@ -188,7 +197,7 @@ func (db *Db) getKeyDir(key string) (*Segment, error) {
func getSegmentFromOffset(offset int64, data []byte) (*Segment, error) {
// get timestamp
if int(offset+BlockSize) > len(data) {
return nil, fmt.Errorf("exceeded data array length")
return nil, errors.New(OFFSET_EXCEEDED_FILE_SIZE)
}
tstamp := data[offset : offset+TstampOffset]
tstamp64Bit := utils.ByteToInt64(tstamp)
Expand All @@ -207,13 +216,13 @@ func getSegmentFromOffset(offset int64, data []byte) (*Segment, error) {
intVsz := utils.ByteToInt64(vsz)

if int(offset+ValueSizeOffset+intKsz) > len(data) {
return nil, fmt.Errorf("exceeded data array length")
return nil, fmt.Errorf(OFFSET_EXCEEDED_FILE_SIZE)
}
// get key
k := data[offset+ValueSizeOffset : offset+ValueSizeOffset+intKsz]

if int(offset+ValueSizeOffset+intKsz+intVsz) > len(data) {
return nil, fmt.Errorf("exceeded data array length")
return nil, fmt.Errorf(OFFSET_EXCEEDED_FILE_SIZE)
}
// get value
v := data[offset+ValueSizeOffset+intKsz : offset+ValueSizeOffset+intKsz+intVsz]
Expand Down Expand Up @@ -289,7 +298,6 @@ func (db *Db) getActiveFileSegments(filePath string) ([]*Segment, error) {
return nil, err
}

// segment.fileID = strings.Split(path.Base(filePath), ".")[0]
segment.fileID = strings.Split(utils.GetFilenameWithoutExtension(filePath), ".")[0]
hasTimestampExpired := utils.HasTimestampExpired(segment.tstamp)
if !hasTimestampExpired {
Expand Down Expand Up @@ -387,8 +395,11 @@ func (db *Db) CreateActiveDatafile(dirPath string) error {
}

func (db *Db) Close(opts *Options) error {
err := db.activeDataFilePointer.Close()
return err
if db.activeDataFilePointer != nil {
err := db.activeDataFilePointer.Close()
return err
}
return nil
}

func Open(opts *Options) (*Db, error) {
Expand Down Expand Up @@ -472,7 +483,8 @@ func (db *Db) Get(key []byte) ([]byte, error) {
func (db *Db) LimitDatafileToThreshold(add int64, opts *Options) {
var sz os.FileInfo
var err error
sz, err = db.getActiveDataFilePointer().Stat()
f, err := db.getActiveDataFilePointer()
sz, err = f.Stat()
if err != nil {
log.Fatal(err)
}
Expand Down
9 changes: 9 additions & 0 deletions tests/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ const (

func TestDbOpen(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)
}

func Test_InMemory_SetGet_With_Expiry(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand All @@ -54,6 +56,7 @@ func Test_InMemory_SetGet_With_Expiry(t *testing.T) {

func Test_InMemory_SetGet(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand All @@ -72,6 +75,7 @@ func Test_InMemory_SetGet(t *testing.T) {

func Test_Set(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand All @@ -86,6 +90,7 @@ func Test_Set(t *testing.T) {

func Test_Get(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand All @@ -103,6 +108,7 @@ func Test_Get(t *testing.T) {

func Test_StressSet(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand All @@ -120,6 +126,7 @@ func Test_StressSet(t *testing.T) {

func Test_StressGet(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand All @@ -139,6 +146,7 @@ func Test_StressGet(t *testing.T) {

func Test_ConcurrentSet(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand All @@ -164,6 +172,7 @@ func Test_ConcurrentSet(t *testing.T) {

func Test_ConcurrentGet(t *testing.T) {
d, err := nimbusdb.Open(opts)
defer d.Close(opts)
assert.Equal(t, err, nil)
assert.NotEqual(t, d, nil)

Expand Down
12 changes: 5 additions & 7 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import (
"github.com/manosriram/nimbusdb/utils"
)

func openFile(path string) (*os.File, error) {
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
return f, err
}

func (db *Db) WriteSegment(segment *Segment) error {
f := db.getActiveDataFilePointer()
_, err := f.Write(segment.ToByte())
f, err := db.getActiveDataFilePointer()
if err != nil {
return err
}
_, err = f.Write(segment.ToByte())
return err
}

Expand Down

0 comments on commit 821b1cb

Please sign in to comment.