Skip to content

Commit

Permalink
fix(gc): reduce allocations
Browse files Browse the repository at this point in the history
reduce set allocs to 11/op from 18/op

reduce get allocs to 5/op from 7/op
  • Loading branch information
manosriram committed May 16, 2024
1 parent c225eec commit 91aee0a
Show file tree
Hide file tree
Showing 6 changed files with 880 additions and 39 deletions.
10 changes: 2 additions & 8 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,14 @@ func set(b *testing.B) {

func get(b *testing.B) {
for i := 0; i < 10000; i++ {
key := []byte(utils.GetTestKey(i))
value := []byte("testvalue")
_, err := db.Set(key, value)
_, err := db.Set([]byte(utils.GetTestKey(i)), []byte("testvalue"))
assert.Nil(b, err)
}
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
kv := &nimbusdb.KeyValuePair{
Key: []byte(utils.GetTestKey(rand.Int())),
Value: []byte("testvalue"),
}
_, err := db.Get(kv.Key)
_, err := db.Get([]byte(utils.GetTestKey(rand.Int())))
if err != nil && err != nimbusdb.ERROR_KEY_NOT_FOUND {
log.Fatal(err)
}
Expand Down
16 changes: 15 additions & 1 deletion btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@ type item struct {
v KeyDirValue
}

var itemPool = sync.Pool{
New: func() interface{} {
return &item{}
},
}

func (it item) Less(i btree.Item) bool {
return bytes.Compare(it.key, i.(*item).key) < 0
}

func (b *BTree) Get(key []byte) *KeyDirValue {
i := b.tree.Get(&item{key: key})
it := itemPool.Get().(*item)
it.key = key
defer itemPool.Put(it)

i := b.tree.Get(it)
if i == nil {
return nil
}
Expand All @@ -40,6 +50,10 @@ func (b *BTree) Set(key []byte, value KeyDirValue) *KeyDirValue {
}

func (b *BTree) Delete(key []byte) *KeyValuePair {
it := itemPool.Get().(*item)
it.key = key
defer itemPool.Put(it)

i := b.tree.Delete(&item{key: key})
if i != nil {
x := i.(*item)
Expand Down
55 changes: 39 additions & 16 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,6 +49,12 @@ type Db struct {
watcher chan WatcherEvent
}

var stringBuilderPool = sync.Pool{
New: func() interface{} {
return &strings.Builder{}
},
}

func NewDb(dirPath string, opts ...*Options) *Db {
segments := make(map[string]*Segment, 0)
db := &Db{
Expand Down Expand Up @@ -190,7 +195,17 @@ func (db *Db) setKeyDir(key []byte, kdValue KeyDirValue) (interface{}, error) {

segment, ok := db.getSegment(kdValue.path)
if !ok {
newSegment := createNewSegment(&kdValue)
newSegment := &Segment{
blocks: map[int64]*BlockOffsetPair{
0: {startOffset: kdValue.offset,
endOffset: kdValue.offset + kdValue.size,
filePath: kdValue.path,
},
},
path: kdValue.path,
currentBlockNumber: 0,
currentBlockOffset: 0,
}
fp, err := db.getSegmentFilePointerFromPath(kdValue.path)
if err != nil {
return nil, err
Expand All @@ -211,7 +226,6 @@ func (db *Db) setKeyDir(key []byte, kdValue KeyDirValue) (interface{}, error) {

return kdValue, nil
}

func (db *Db) getKeyDir(key []byte) (*KeyValueEntry, error) {
var cacheBlock = new(Block)

Expand Down Expand Up @@ -254,12 +268,8 @@ func (db *Db) getKeyDir(key []byte) (*KeyValueEntry, error) {
return nil, err
}

if v != nil {
tstampString, err := strconv.ParseInt(fmt.Sprint(v.tstamp), 10, 64)
if err != nil {
return nil, err
}
hasTimestampExpired := utils.HasTimestampExpired(tstampString)
if v.tstamp != 0 {
hasTimestampExpired := utils.HasTimestampExpired(v.tstamp)
if hasTimestampExpired {
db.keyDir.Delete(key)
return nil, ERROR_KEY_NOT_FOUND
Expand Down Expand Up @@ -435,22 +445,29 @@ func (db *Db) createActiveDatafile(dirPath string) error {
}
extension := path.Ext(file.Name())
if extension == ActiveKeyValueEntryDatafileSuffix {
inactiveName := fmt.Sprintf("%s.idfile", strings.Split(file.Name(), ".")[0])
inactiveName := stringBuilderPool.Get().(*strings.Builder)
defer stringBuilderPool.Put(inactiveName)

inactiveName.Reset()
inactiveName.Write([]byte(strings.Split(file.Name(), ".")[0]))
inactiveName.Write([]byte(".idfile"))

// inactiveName := fmt.Sprintf("%s.idfile", strings.Split(file.Name(), ".")[0])

oldPath := filepath.Join(dirPath, file.Name())
newPath := filepath.Join(dirPath, inactiveName)
newPath := filepath.Join(dirPath, inactiveName.String())
os.Rename(oldPath, newPath)

fp, err := db.getSegmentFilePointerFromPath(inactiveName)
fp, err := db.getSegmentFilePointerFromPath(inactiveName.String())
if err != nil {
return err
}
segment, ok := db.getSegment(file.Name())
if !ok {
return ERROR_CANNOT_READ_FILE
}
db.setSegment(inactiveName, segment)
segment.setPath(inactiveName)
db.setSegment(inactiveName.String(), segment)
segment.setPath(inactiveName.String())
segment.setWriter(fp)
}
}
Expand Down Expand Up @@ -780,7 +797,12 @@ func (db *Db) walk(s string, file fs.DirEntry, err error) error {
if (z.Endoffset-z.Startoffset)+info.Size() > DatafileThreshold {
db.closeActiveMergeDataFilePointer()
swapFilename := strings.Split(newPath, ".")[0]
err = os.Rename(newPath, fmt.Sprintf("%s.idfile", swapFilename))
sb := stringBuilderPool.Get().(strings.Builder)
defer stringBuilderPool.Put(sb)
sb.WriteString(swapFilename)
sb.WriteString(".idfile")

err = os.Rename(newPath, sb.String())
if err != nil {
return err
}
Expand Down Expand Up @@ -808,7 +830,8 @@ func (db *Db) merge() error {
}
for _, file := range files {
ff := strings.Split(file, ".")[0]
err := os.Rename(file, fmt.Sprintf("%s.idfile", ff))
x := ff + ".idfile"
err := os.Rename(file, x)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 91aee0a

Please sign in to comment.