Skip to content

Commit

Permalink
Merge pull request #8 from manosriram/feat/crc
Browse files Browse the repository at this point in the history
add CRC to KeyValueEntry
  • Loading branch information
manosriram authored Dec 25, 2023
2 parents 0a12719 + 9fbbfa2 commit 077a003
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 126 deletions.
117 changes: 42 additions & 75 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"io"
"io/fs"
"log"
"os"
Expand Down Expand Up @@ -46,16 +45,24 @@ const (
)

const (
DeleteFlagOffset int64 = 1
TstampOffset = 11
KeySizeOffset = 21
ValueSizeOffset = 31

StaticChunkSize = 1 + 10 + 10 + 10
BTreeDegree = 10
CrcSize int64 = 5
DeleteFlagSize = 1
TstampSize = 10
KeySizeSize = 10
ValueSizeSize = 10
StaticChunkSize = CrcSize + DeleteFlagSize + TstampSize + KeySizeSize + ValueSizeSize

CrcOffset int64 = 5
DeleteFlagOffset = 6
TstampOffset = 16
KeySizeOffset = 26
ValueSizeOffset = 36

BTreeDegree int = 10
)

const (
TotalStaticChunkSize int64 = TstampOffset + KeySizeOffset + ValueSizeOffset + DeleteFlagOffset + StaticChunkSize
TotalStaticChunkSize int64 = TstampOffset + KeySizeOffset + ValueSizeOffset + DeleteFlagOffset + CrcOffset + StaticChunkSize
)

var (
Expand All @@ -64,6 +71,7 @@ var (
ERROR_OFFSET_EXCEEDED_FILE_SIZE = errors.New("offset exceeded file size")
ERROR_CANNOT_READ_FILE = errors.New("error reading file")
ERROR_KEY_VALUE_SIZE_EXCEEDED = errors.New(fmt.Sprintf("exceeded limit of %d bytes", BlockSize))
ERROR_CRC_DOES_NOT_MATCH = errors.New("crc does not match. corrupted datafile")
)

const (
Expand All @@ -79,7 +87,8 @@ const (
EXIT_NOT_OK = 0
EXIT_OK = 1

INITIAL_SEGMENT_OFFSET = 0
INITIAL_SEGMENT_OFFSET = 0
INITIAL_KEY_VALUE_ENTRY_OFFSET = 0
)

type Options struct {
Expand Down Expand Up @@ -291,54 +300,6 @@ func (db *Db) getKeyValueEntryFromOffsetViaFilePath(keyDirPath string) ([]byte,
return data, nil
}

func (db *Db) seekOffsetFromDataFile(kdValue KeyDirValue) (*KeyValueEntry, error) {
defer utils.Recover()

f, err := db.getSegmentFilePointerFromPath(kdValue.path)
if err != nil {
return nil, err
}

data := make([]byte, kdValue.size)
f.Seek(kdValue.offset, io.SeekCurrent)
f.Read(data)

deleted := data[0]
if deleted == DELETED_FLAG_BYTE_VALUE {
return nil, ERROR_KEY_NOT_FOUND
}

tstamp := data[DeleteFlagOffset:TstampOffset]
tstamp64Bit := utils.ByteToInt64(tstamp)
hasTimestampExpired := utils.HasTimestampExpired(tstamp64Bit)
if hasTimestampExpired {
return nil, ERROR_KEY_NOT_FOUND
}

ksz := data[TstampOffset:KeySizeOffset]
intKsz := utils.ByteToInt64(ksz)

// get value size
vsz := data[KeySizeOffset:ValueSizeOffset]
intVsz := utils.ByteToInt64(vsz)

// get key
k := data[ValueSizeOffset : ValueSizeOffset+intKsz]

// get value
v := data[ValueSizeOffset+intKsz : ValueSizeOffset+intKsz+intVsz]

return &KeyValueEntry{
tstamp: int64(tstamp64Bit),
ksz: int64(intKsz),
vsz: int64(intVsz),
k: k,
v: v,
offset: kdValue.offset,
size: kdValue.size,
}, nil
}

func (db *Db) getActiveFileKeyValueEntries(filePath string) ([]*KeyValueEntry, error) {
defer utils.Recover()

Expand Down Expand Up @@ -585,16 +546,12 @@ func (db *Db) limitDatafileToThreshold(newKeyValueEntry *KeyValueEntry, opts *Op
os.Remove(opts.MergeFilePath)
} else {
db.createActiveDatafile(db.dirPath)
newKeyValueEntry.offset = 0
newKeyValueEntry.offset = INITIAL_KEY_VALUE_ENTRY_OFFSET
}
}
}

func (db *Db) deleteKey(key []byte) error {
// TODO: move this to someplace better
db.mu.Lock()
defer db.mu.Unlock()

v := db.keyDir.Get(key)
if v == nil {
return ERROR_KEY_NOT_FOUND
Expand All @@ -618,9 +575,9 @@ func (db *Db) Get(key []byte) ([]byte, error) {
db.mu.Lock()
defer db.mu.Unlock()

v, _ := db.getKeyDir(key)
v, err := db.getKeyDir(key)
if v == nil {
return nil, ERROR_KEY_NOT_FOUND
return nil, err
}
return v.v, nil
}
Expand All @@ -630,20 +587,27 @@ func (db *Db) Get(key []byte) ([]byte, error) {
func (db *Db) Set(kv *KeyValuePair) (interface{}, error) {
intKSz := int64(len(kv.Key))
intVSz := int64(len(utils.Encode(kv.Value)))
newKeyValueEntry := NewKeyValueEntry(
DELETED_FLAG_UNSET_VALUE,
db.getLastOffset(),
int64(len(kv.Key)),
int64(len(utils.Encode(kv.Value))),
int64(StaticChunkSize+intKSz+intVSz),
kv.Key,
utils.Encode(kv.Value),
)
newKeyValueEntry.setTTL(kv)

newKeyValueEntry := &KeyValueEntry{
deleted: DELETED_FLAG_UNSET_VALUE,
offset: db.getLastOffset(),
ksz: int64(len(kv.Key)),
vsz: int64(len(utils.Encode(kv.Value))),
size: int64(StaticChunkSize + intKSz + intVSz),
k: kv.Key,
v: utils.Encode(kv.Value),
}

if kv.Ttl > 0 {
newKeyValueEntry.setTTLViaDuration(kv.Ttl)
} else {
newKeyValueEntry.setTTLViaDuration(KEY_EXPIRES_IN_DEFAULT)
}

db.mu.Lock()
defer db.mu.Unlock()
db.limitDatafileToThreshold(newKeyValueEntry, &Options{})
newKeyValueEntry.setCRC(newKeyValueEntry.calculateCRC())
err := db.writeKeyValueEntry(newKeyValueEntry)
if err != nil {
return nil, err
Expand All @@ -661,6 +625,9 @@ func (db *Db) Set(kv *KeyValuePair) (interface{}, error) {
// Deletes a key-value pair.
// Returns error if any.
func (db *Db) Delete(key []byte) error {
db.mu.Lock()
defer db.mu.Unlock()

err := db.deleteKey(key)
return err
}
Expand Down
18 changes: 7 additions & 11 deletions examples/b.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@

// import "fmt"

// type y struct {
// name string
// }

// func main() {
// x := [5]int{1, 2, 3, 4, 5}
// fmt.Println(x)
// for i, v := range x {
// v += 2
// x[i] = v
// }
// fmt.Println(x)
// // t := crc32.MakeTable(crc32.IEEE)
// // data := []byte("test")
// // hash := crc32.Checksum(data, t)
// // fmt.Println(utils.Int32ToByte(int32(hash)))

// s := "helloworld"
// fmt.Println(string(s[6]))
// }
26 changes: 26 additions & 0 deletions utils/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ func HasTimestampExpired(timestamp int64) bool {
return tstamp < now
}

func UInt32ToByte(n uint32) []byte {
b := make([]byte, binary.MaxVarintLen32)
binary.LittleEndian.PutUint32(b, n)
return b
}

func UInt64ToByte(n uint64) []byte {
b := make([]byte, binary.MaxVarintLen64)
binary.LittleEndian.PutUint64(b, n)
return b
}

func Int32ToByte(n int32) []byte {
b := make([]byte, binary.MaxVarintLen32)
binary.LittleEndian.PutUint32(b, uint32(n))
Expand All @@ -55,6 +67,20 @@ func Int64ToByte(n int64) []byte {
return b
}

func ByteToUInt64(b []byte) uint64 {
if lesser := len(b) < binary.MaxVarintLen64; lesser {
b = b[:cap(b)]
}
return binary.LittleEndian.Uint64(b)
}

func ByteToUInt32(b []byte) uint32 {
if lesser := len(b) < binary.MaxVarintLen32; lesser {
b = b[:cap(b)]
}
return binary.LittleEndian.Uint32(b)
}

func ByteToInt32(b []byte) int32 {
if lesser := len(b) < binary.MaxVarintLen32; lesser {
b = b[:cap(b)]
Expand Down
Loading

0 comments on commit 077a003

Please sign in to comment.