Skip to content

Commit

Permalink
Merge pull request #6 from rabbit2025/main
Browse files Browse the repository at this point in the history
Add interface IKVDB
  • Loading branch information
arnoworldx authored Jan 19, 2024
2 parents 1458323 + 67022c4 commit d146aea
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 53 deletions.
18 changes: 18 additions & 0 deletions indextree/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package indextree

import "github.com/smartbch/moeingads/types"

type IKVDB interface {
Close()
GetPruneHeight() (uint64, bool)
NewBatch() types.Batch
OpenNewBatch()
CloseOldBatch()
LockBatch()
UnlockBatch()
CurrBatch() types.Batch
Get(key []byte) []byte
Iterator(start, end []byte) types.Iterator
ReverseIterator(start, end []byte) types.Iterator
SetPruneHeight(h uint64)
}
22 changes: 11 additions & 11 deletions indextree/fuzz/itfuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,26 @@ var AllOnes = []byte{255, 255, 255, 255, 255, 255, 255, 255}
var AllZeros = []byte{0, 0, 0, 0, 0, 0, 0, 0}

type NVTreeRef struct {
rocksdb *it.RocksDB
kvdb it.IKVDB
batch types.Batch
currHeight [8]byte
}

func (tree *NVTreeRef) Init(dirname string) (err error) {
tree.rocksdb, err = it.NewRocksDB("idxtreeref", dirname)
tree.kvdb, err = it.NewRocksDB("idxtreeref", dirname)
if err != nil {
return err
}
return nil
}

func (tree *NVTreeRef) Close() {
tree.rocksdb.Close()
tree.kvdb.Close()
}

func (tree *NVTreeRef) BeginWrite(currHeight int64) {
//fmt.Printf("========= currHeight %d =========\n", currHeight)
tree.batch = tree.rocksdb.NewBatch()
tree.batch = tree.kvdb.NewBatch()
binary.BigEndian.PutUint64(tree.currHeight[:], uint64(currHeight))
}

Expand All @@ -69,7 +69,7 @@ func (tree *NVTreeRef) Set(k []byte, v int64) {
}

func (tree *NVTreeRef) Get(k []byte) (int64, bool) {
value := tree.rocksdb.Get(append([]byte{1}, k...))
value := tree.kvdb.Get(append([]byte{1}, k...))
if len(value) == 0 {
return 0, false
}
Expand All @@ -80,7 +80,7 @@ func (tree *NVTreeRef) GetAtHeight(k []byte, height uint64) (position int64, ok
copyK := append([]byte{0}, k...)
copyK = append(copyK, AllOnes...)
binary.BigEndian.PutUint64(copyK[len(copyK)-8:], height+1) //overwrite the 'AllOnes' part
iter := tree.rocksdb.ReverseIterator(AllZeros, copyK)
iter := tree.kvdb.ReverseIterator(AllZeros, copyK)
defer iter.Close()
//fmt.Printf(" the key : %#v copyK %#v\n", iter.Key(), copyK)
if !iter.Valid() || len(iter.Value()) == 0 || !bytes.Equal(iter.Key()[1:9], k) {
Expand All @@ -97,11 +97,11 @@ func (tree *NVTreeRef) Delete(k []byte) {
}

func (tree *NVTreeRef) Iterator(start, end []byte) types.Iterator {
return tree.rocksdb.Iterator(append([]byte{1}, start...), append([]byte{1}, end...))
return tree.kvdb.Iterator(append([]byte{1}, start...), append([]byte{1}, end...))
}

func (tree *NVTreeRef) ReverseIterator(start, end []byte) types.Iterator {
return tree.rocksdb.ReverseIterator(append([]byte{1}, start...), append([]byte{1}, end...))
return tree.kvdb.ReverseIterator(append([]byte{1}, start...), append([]byte{1}, end...))
}

func assert(b bool, s string) {
Expand Down Expand Up @@ -208,7 +208,7 @@ func FuzzDelete(trMem *it.NVTreeMem, refTree *NVTreeRef, cfg FuzzConfig, rs rand
//return
}

func FuzzInit(rocksdb *it.RocksDB, trMem *it.NVTreeMem, refTree *NVTreeRef, cfg FuzzConfig, rs randsrc.RandSrc, h uint64, changeMap map[string]int64) {
func FuzzInit(kvdb it.IKVDB, trMem *it.NVTreeMem, refTree *NVTreeRef, cfg FuzzConfig, rs randsrc.RandSrc, h uint64, changeMap map[string]int64) {
for i := 0; i < cfg.InitCount; i++ {
// set new key/value
key, value := getRandKey(rs), (rs.GetInt64() & ((int64(1) << 48) - 1))
Expand All @@ -228,7 +228,7 @@ func FuzzInit(rocksdb *it.RocksDB, trMem *it.NVTreeMem, refTree *NVTreeRef, cfg
assert(ok, "Get must return ok")
changeMap[string(key[1:])] = value
}
rocksdb.OpenNewBatch()
kvdb.OpenNewBatch()
trMem.BeginWrite(int64(h))
refTree.BeginWrite(int64(h))
for key, value := range changeMap {
Expand All @@ -242,7 +242,7 @@ func FuzzInit(rocksdb *it.RocksDB, trMem *it.NVTreeMem, refTree *NVTreeRef, cfg
}
}
trMem.EndWrite()
rocksdb.CloseOldBatch()
kvdb.CloseOldBatch()
refTree.EndWrite()
}

Expand Down
24 changes: 12 additions & 12 deletions indextree/indextree.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,25 @@ type NVTreeMem struct {
mtx sync.RWMutex
bt *b.Tree
isWriting bool
rocksdb *RocksDB
kvdb IKVDB
currHeight [8]byte
duringInit bool
}

var _ types.IndexTree = (*NVTreeMem)(nil)

func NewNVTreeMem(rocksdb *RocksDB) *NVTreeMem {
func NewNVTreeMem(kvdb IKVDB) *NVTreeMem {
btree := b.TreeNew()
return &NVTreeMem{
bt: btree,
rocksdb: rocksdb,
bt: btree,
kvdb: kvdb,
}
}

// Load the RocksDB and use its up-to-date records to initialize the in-memory B-Tree.
// RocksDB's historical records are ignored.
func (tree *NVTreeMem) Init(repFn func([]byte)) (err error) {
iter := tree.rocksdb.ReverseIterator([]byte{}, []byte(nil))
iter := tree.kvdb.ReverseIterator([]byte{}, []byte(nil))
defer iter.Close()
var key []byte
for iter.Valid() {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (tree *NVTreeMem) Set(k []byte, v int64) {
}
tree.bt.Set(binary.BigEndian.Uint64(k), v)

if tree.rocksdb == nil || tree.duringInit {
if tree.kvdb == nil || tree.duringInit {
return
}
newK := make([]byte, 0, 1+len(k)+8)
Expand All @@ -214,9 +214,9 @@ func (tree *NVTreeMem) Set(k []byte, v int64) {
}

func (tree *NVTreeMem) batchSet(key, value []byte) {
tree.rocksdb.LockBatch()
tree.rocksdb.CurrBatch().Set(key, value)
tree.rocksdb.UnlockBatch()
tree.kvdb.LockBatch()
tree.kvdb.CurrBatch().Set(key, value)
tree.kvdb.UnlockBatch()
}

//func (tree *NVTreeMem) batchDelete(key []byte) {
Expand All @@ -237,13 +237,13 @@ func (tree *NVTreeMem) Get(k []byte) (int64, bool) {

// Get the position of k, at the specified height.
func (tree *NVTreeMem) GetAtHeight(k []byte, height uint64) (position int64, ok bool) {
if h, enable := tree.rocksdb.GetPruneHeight(); enable && height <= h {
if h, enable := tree.kvdb.GetPruneHeight(); enable && height <= h {
return 0, false
}
newK := make([]byte, 1+len(k)+8) // all bytes equal zero
copy(newK[1:], k)
binary.BigEndian.PutUint64(newK[1+len(k):], height+1)
iter := tree.rocksdb.ReverseIterator([]byte{}, newK)
iter := tree.kvdb.ReverseIterator([]byte{}, newK)
defer iter.Close()
if !iter.Valid() || !bytes.Equal(iter.Key()[1:1+len(k)], k) { //not exists or to a different key
return 0, false
Expand Down Expand Up @@ -272,7 +272,7 @@ func (tree *NVTreeMem) Delete(k []byte) {
}
tree.bt.Delete(key)

if tree.rocksdb == nil || tree.duringInit {
if tree.kvdb == nil || tree.duringInit {
return
}
newK := make([]byte, 0, 1+len(k)+8)
Expand Down
2 changes: 1 addition & 1 deletion indextree/indextree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func mustGetH(tree *NVTreeMem, key uint64, height uint64) int64 {
return res
}

func createNVTreeMem(dirname string) (*RocksDB, *NVTreeMem) {
func createNVTreeMem(dirname string) (IKVDB, *NVTreeMem) {
rocksdb, err := NewRocksDB("idxtree", dirname)
if err != nil {
panic(err)
Expand Down
5 changes: 2 additions & 3 deletions indextree/rocks_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (db *RocksDB) OpenNewBatch() {
db.batch = &rocksDBBatch{db, batch}
}

func NewRocksDB(name string, dir string) (*RocksDB, error) {
func NewRocksDB(name string, dir string) (IKVDB, error) {
// default rocksdb option, good enough for most cases, including heavy workloads.
// 64MB table cache, 32MB write buffer
// compression: snappy as default, need to -lsnappy to enable.
Expand All @@ -93,7 +93,7 @@ func NewRocksDB(name string, dir string) (*RocksDB, error) {
return NewRocksDBWithOptions(name, dir, opts)
}

func NewRocksDBWithOptions(name string, dir string, opts *gorocksdb.Options) (*RocksDB, error) {
func NewRocksDBWithOptions(name string, dir string, opts *gorocksdb.Options) (IKVDB, error) {
dbPath := filepath.Join(dir, name+".db")
filter := HeightCompactionFilter{}
opts.SetCompactionFilter(&filter) // use a customized compaction filter
Expand Down Expand Up @@ -301,7 +301,6 @@ func (mBatch *rocksDBBatch) Close() {
}
}


//----------------------------------------
// Iterator

Expand Down
4 changes: 2 additions & 2 deletions metadb/metadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
)

type MetaDB struct {
kvdb *indextree.RocksDB
kvdb indextree.IKVDB
currHeight int64
lastPrunedTwig [types.ShardCount]int64
maxSerialNum [types.ShardCount]int64
Expand All @@ -35,7 +35,7 @@ type MetaDB struct {

var _ types.MetaDB = (*MetaDB)(nil)

func NewMetaDB(kvdb *indextree.RocksDB) *MetaDB {
func NewMetaDB(kvdb indextree.IKVDB) *MetaDB {
return &MetaDB{kvdb: kvdb}
}

Expand Down
36 changes: 18 additions & 18 deletions moeingads.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type MoeingADS struct {
meta types.MetaDB
idxTree types.IndexTree
datTree [types.ShardCount]types.DataTree
rocksdb *indextree.RocksDB
kvdb indextree.IKVDB
k2heMap *BucketMap // key-to-hot-entry map
nkSet *BucketSet // next-key set
tempEntries64 [BucketCount][]*HotEntry
Expand Down Expand Up @@ -66,13 +66,13 @@ func NewMoeingADS4Mock(startEndKeys [][]byte) *MoeingADS {
mads.idxTree = indextree.NewMockIndexTree()

var err error
mads.rocksdb, err = indextree.NewRocksDB("rocksdb", "./")
mads.kvdb, err = indextree.NewRocksDB("rocksdb", "./")
if err != nil {
panic(err)
}

mads.meta = metadb.NewMetaDB(mads.rocksdb)
mads.rocksdb.OpenNewBatch()
mads.meta = metadb.NewMetaDB(mads.kvdb)
mads.kvdb.OpenNewBatch()
mads.initGuards()
return mads
}
Expand All @@ -98,18 +98,18 @@ func NewMoeingADS(dirName string, canQueryHistory bool, startEndKeys [][]byte) (
_ = os.Mkdir(dirName, 0700)
}

mads.rocksdb, err = indextree.NewRocksDB("rocksdb", dirName)
mads.kvdb, err = indextree.NewRocksDB("rocksdb", dirName)
if err != nil {
panic(err)
}
mads.meta = metadb.NewMetaDB(mads.rocksdb)
mads.meta = metadb.NewMetaDB(mads.kvdb)
if !dirNotExists {
mads.meta.ReloadFromKVDB()
//mads.meta.PrintInfo()
}

if canQueryHistory {
mads.idxTree = indextree.NewNVTreeMem(mads.rocksdb)
mads.idxTree = indextree.NewNVTreeMem(mads.kvdb)
} else {
mads.idxTree = indextree.NewNVTreeMem(nil)
}
Expand All @@ -118,7 +118,7 @@ func NewMoeingADS(dirName string, canQueryHistory bool, startEndKeys [][]byte) (
suffix := fmt.Sprintf(".%d", i)
mads.datTree[i] = datatree.NewEmptyTree(HPFileBufferSize, DefaultHPFileSize, dirName, suffix)
}
mads.rocksdb.OpenNewBatch()
mads.kvdb.OpenNewBatch()
mads.meta.Init()
for i := 0; i < types.ShardCount; i++ {
for j := 0; j < 2048; j++ {
Expand All @@ -129,7 +129,7 @@ func NewMoeingADS(dirName string, canQueryHistory bool, startEndKeys [][]byte) (
}
}
mads.initGuards()
mads.rocksdb.CloseOldBatch()
mads.kvdb.CloseOldBatch()
} else {
mads.recoverDataTrees(dirName)
if canQueryHistory {
Expand Down Expand Up @@ -193,8 +193,8 @@ func (mads *MoeingADS) initGuards() {
for i := 0; i < types.ShardCount; i++ {
mads.datTree[i].WaitForFlushing()
}
mads.rocksdb.CloseOldBatch()
mads.rocksdb.OpenNewBatch()
mads.kvdb.CloseOldBatch()
mads.kvdb.OpenNewBatch()
}

func (mads *MoeingADS) recoverDataTrees(dirName string) {
Expand Down Expand Up @@ -232,14 +232,14 @@ func (mads *MoeingADS) PrintIdxTree() {

func (mads *MoeingADS) Close() {
mads.idxTree.Close()
mads.rocksdb.Close()
mads.kvdb.Close()
for i := 0; i < types.ShardCount; i++ {
mads.datTree[i].Close()
mads.datTree[i] = nil
}
mads.meta.Close()
mads.idxTree = nil
mads.rocksdb = nil
mads.kvdb = nil
mads.meta = nil
mads.k2heMap = nil
mads.nkSet = nil
Expand Down Expand Up @@ -419,7 +419,7 @@ func (mads *MoeingADS) GetCurrHeight() int64 {
}

func (mads *MoeingADS) BeginWrite(height int64) {
mads.rocksdb.OpenNewBatch()
mads.kvdb.OpenNewBatch()
mads.idxTree.BeginWrite(height)
mads.meta.SetCurrHeight(height)
}
Expand Down Expand Up @@ -712,7 +712,7 @@ func (mads *MoeingADS) EndWrite() {
}
mads.meta.Commit()
mads.idxTree.EndWrite()
mads.rocksdb.CloseOldBatch()
mads.kvdb.CloseOldBatch()
}

func (mads *MoeingADS) PruneBeforeHeight(height int64) {
Expand All @@ -730,7 +730,7 @@ func (mads *MoeingADS) PruneBeforeHeight(height int64) {
end--
starts[shardID], ends[shardID] = start, end
})
mads.rocksdb.OpenNewBatch()
mads.kvdb.OpenNewBatch()
for shardID := 0; shardID < types.ShardCount; shardID++ {
start, end := starts[shardID], ends[shardID]
if end > start+datatree.MinPruneCount {
Expand All @@ -742,8 +742,8 @@ func (mads *MoeingADS) PruneBeforeHeight(height int64) {
mads.meta.SetLastPrunedTwig(shardID, end-1)
}
}
mads.rocksdb.CloseOldBatch()
mads.rocksdb.SetPruneHeight(uint64(height))
mads.kvdb.CloseOldBatch()
mads.kvdb.SetPruneHeight(uint64(height))
}

func (mads *MoeingADS) CheckHashConsistency() {
Expand Down
Loading

0 comments on commit d146aea

Please sign in to comment.