From 98630fa5f1a66dd360e28a368ce2a13b8548d0a8 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Fri, 8 Mar 2019 11:10:55 +0800 Subject: [PATCH 01/15] fix struct tag --- config/server_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/server_config.go b/config/server_config.go index 3ba2f56..847b215 100644 --- a/config/server_config.go +++ b/config/server_config.go @@ -26,7 +26,7 @@ type ServerConfig struct { Threads int `yaml:",omitempty"` // NumCPU ErrorLog string `yaml:",omitempty"` AccessLog string `yaml:",omitempty"` - AnalysisLog string `yanl:",omitempty"` + AnalysisLog string `yaml:",omitempty"` StaticDir string `yaml:",omitempty"` // directory for static files, e.g. *.html } From c88f2574c084093fbccd1f3f6c66e86e6d95026f Mon Sep 17 00:00:00 2001 From: ZHAO Date: Wed, 10 Apr 2019 17:36:30 +0800 Subject: [PATCH 02/15] reuse buffer when gc --- gobeansdb/web.go | 11 +++++++++++ loghub/analysislog.go | 2 ++ loghub/errorlog.go | 2 ++ store/config.go | 2 ++ store/config_default.go | 3 ++- store/data.go | 4 ++-- store/datachunk.go | 20 +++++++++----------- store/datafile.go | 20 ++++++++++++++------ store/gc.go | 2 +- store/hstore.go | 13 +++++++++++++ store/hstore_test.go | 10 +--------- 11 files changed, 59 insertions(+), 30 deletions(-) diff --git a/gobeansdb/web.go b/gobeansdb/web.go index 3570126..2ec5524 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -137,6 +137,11 @@ func handleYaml(w http.ResponseWriter, v interface{}) { } } +func getGCQuery(r *http.Request) bool { + s := filepath.Base(r.URL.Path) + return s == "query" +} + func getBucket(r *http.Request) (bucketID int64, err error) { s := filepath.Base(r.URL.Path) return strconv.ParseInt(s, 16, 16) @@ -323,6 +328,12 @@ func handleGC(w http.ResponseWriter, r *http.Request) { } }() + isQuery := getGCQuery(r) + if isQuery { + result = storage.hstore.GCBuckets() + return + } + bucketID, err = getBucket(r) if err != nil { return diff --git a/loghub/analysislog.go b/loghub/analysislog.go index ec9f19e..4d7dc92 100644 --- a/loghub/analysislog.go +++ b/loghub/analysislog.go @@ -41,7 +41,9 @@ func (hub *AnalysisLogHub) Log(name string, level int, file string, line int, ms hub.logger.Printf(AnalysisLogFormat, msg) bufline := &BufferLine{time.Now(), level, file, line, msg} hub.Add(bufline) + hub.Lock() hub.Last[level] = bufline + hub.Unlock() if level == FATAL { os.Exit(1) } diff --git a/loghub/errorlog.go b/loghub/errorlog.go index 585cbfd..55087dd 100644 --- a/loghub/errorlog.go +++ b/loghub/errorlog.go @@ -45,7 +45,9 @@ func (hub *ErrorLogHub) Log(name string, level int, file string, line int, msg s hub.logger.Printf(ErrorLogFormat, levelString[level], file, line, msg) bufline := &BufferLine{time.Now(), level, file, line, msg} hub.Add(bufline) + hub.Lock() hub.Last[level] = bufline + hub.Unlock() if level == FATAL { os.Exit(1) } diff --git a/store/config.go b/store/config.go index 7593066..47bbd64 100644 --- a/store/config.go +++ b/store/config.go @@ -46,6 +46,8 @@ type DataConfig struct { FlushWakeStr string `yaml:"flush_wake_str"` // DataFileMaxStr string `yaml:"datafile_max_str,omitempty"` + BufIOCap int `yaml:"-"` // for bufio reader/writer, if value is big, then enlarge this cap, defalult: 1MB + BufIOCapStr string `yaml:"bufio_cap_str,omitempty"` NotCompress map[string]bool `yaml:"not_compress,omitempty"` // kind do not compress } diff --git a/store/config_default.go b/store/config_default.go index 9c8f820..09464d8 100644 --- a/store/config_default.go +++ b/store/config_default.go @@ -18,7 +18,7 @@ import "github.com/douban/gobeansdb/config" NoMerged: false MergeInterval: 1 -3. reasonabe setting for others +3. reasonable setting for others */ @@ -40,6 +40,7 @@ var ( CheckVHash: false, FlushInterval: 0, FlushWakeStr: "0", + BufIOCapStr: "1M", NoGCDays: 0, NotCompress: map[string]bool{ diff --git a/store/data.go b/store/data.go index c855f56..d5a6de7 100644 --- a/store/data.go +++ b/store/data.go @@ -173,7 +173,7 @@ func (ds *dataStore) ListFiles() (max int, err error) { func (ds *dataStore) GetStreamReader(chunk int) (*DataStreamReader, error) { path := ds.genPath(chunk) - return newDataStreamReader(path, 1<<20) + return newDataStreamReader(path, Conf.BufIOCap) } func GetStreamWriter(path string, isappend bool) (*DataStreamWriter, error) { @@ -204,7 +204,7 @@ func GetStreamWriter(path string, isappend bool) (*DataStreamWriter, error) { return nil, err } } - wbuf := bufio.NewWriterSize(fd, 1<<20) + wbuf := bufio.NewWriterSize(fd, Conf.BufIOCap) w := &DataStreamWriter{path: path, fd: fd, wbuf: wbuf, offset: offset} return w, nil } diff --git a/store/datachunk.go b/store/datachunk.go index acaf0e4..c43d944 100644 --- a/store/datachunk.go +++ b/store/datachunk.go @@ -10,11 +10,6 @@ import ( "github.com/douban/gobeansdb/utils" ) -var ( - GCWriteBufferSizeDefault = uint32(1 << 20) - GCWriteBufferSize = GCWriteBufferSizeDefault -) - type dataChunk struct { sync.Mutex @@ -63,7 +58,6 @@ func (dc *dataChunk) AppendRecordGC(wrec *WriteRecord) (offset uint32, err error wrec.pos.ChunkID = dc.chunkid offset = dc.writingHead wrec.pos.Offset = offset - dc.wbuf = append(dc.wbuf, wrec) size := wrec.rec.Payload.RecSize dc.writingHead += size @@ -72,10 +66,14 @@ func (dc *dataChunk) AppendRecordGC(wrec *WriteRecord) (offset uint32, err error } dc.Unlock() - dc.gcbufsize += size - if dc.gcbufsize > GCWriteBufferSize { - _, err = dc.flush(dc.gcWriter, true) - dc.gcbufsize = 0 + _, err = dc.gcWriter.append(wrec) + if err != nil { + logger.Fatalf("fail to append, stop! err: %v", err) + } + + if err = dc.gcWriter.wbuf.Flush(); err != nil { + logger.Fatalf("write data fail, stop! err: %v", err) + return 0, err } return } @@ -203,7 +201,7 @@ func (dc *dataChunk) beginGCWriting(srcChunk int) (err error) { func (dc *dataChunk) endGCWriting() (err error) { logger.Infof("endGCWriting chunk %d rewrite %v size %d wsize%d ", dc.chunkid, dc.rewriting, dc.size, dc.writingHead) if dc.gcWriter != nil { - _, err = dc.flush(dc.gcWriter, true) + err = dc.gcWriter.wbuf.Flush() dc.gcWriter.Close() dc.gcWriter = nil } diff --git a/store/datafile.go b/store/datafile.go index a1789fb..efcb198 100644 --- a/store/datafile.go +++ b/store/datafile.go @@ -178,6 +178,8 @@ type DataStreamReader struct { chunk int offset uint32 + + maxBodyBuf []byte } func newDataStreamReader(path string, bufsz int) (*DataStreamReader, error) { @@ -187,11 +189,12 @@ func newDataStreamReader(path string, bufsz int) (*DataStreamReader, error) { return nil, err } rbuf := bufio.NewReaderSize(fd, bufsz) - return &DataStreamReader{path: path, fd: fd, rbuf: rbuf, offset: 0}, nil + maxBodyBuf := make([]byte, 0, config.MCConf.BodyMax) + return &DataStreamReader{path: path, fd: fd, rbuf: rbuf, offset: 0, maxBodyBuf: maxBodyBuf}, nil } func (stream *DataStreamReader) seek(offset uint32) { - stream.fd.Seek(int64(offset), os.SEEK_SET) + stream.fd.Seek(int64(offset), io.SeekStart) stream.offset = offset } @@ -208,7 +211,7 @@ func (stream *DataStreamReader) nextValid() (rec *Record, offset uint32, sizeBro logger.Infof("crc fail end offset 0x%x, sizeBroken 0x%x", offset2, sizeBroken) _, rsize := wrec.rec.Sizes() offset3 := offset2 + rsize - stream.fd.Seek(int64(offset3), 0) + stream.fd.Seek(int64(offset3), io.SeekStart) stream.rbuf.Reset(stream.fd) stream.offset = offset2 + rsize return wrec.rec, offset2, sizeBroken, nil @@ -233,8 +236,13 @@ func (stream *DataStreamReader) Next() (res *Record, offset uint32, sizeBroken u return } wrec.decodeHeader() - if wrec.ksz > 250 || wrec.ksz <= 0 { // TODO - logger.Errorf("bad key len %s %d %d %d", stream.fd.Name(), stream.offset, wrec.ksz, wrec.vsz) + if !config.IsValidKeySize(wrec.ksz) { + logger.Errorf("gc: bad key len %s %d %d %d", stream.fd.Name(), stream.offset, wrec.ksz, wrec.vsz) + return stream.nextValid() + } + + if !config.IsValidValueSize(wrec.vsz) { + logger.Errorf("gc: bad value len %s %d %d %d", stream.fd.Name(), stream.offset, wrec.ksz, wrec.vsz) return stream.nextValid() } @@ -243,7 +251,7 @@ func (stream *DataStreamReader) Next() (res *Record, offset uint32, sizeBroken u logger.Errorf(err.Error()) return } - wrec.rec.Payload.Body = make([]byte, wrec.vsz) + wrec.rec.Payload.Body = stream.maxBodyBuf[:wrec.vsz] if _, err = io.ReadFull(stream.rbuf, wrec.rec.Payload.Body); err != nil { logger.Errorf(err.Error()) return diff --git a/store/gc.go b/store/gc.go index 6efca84..58b9a3a 100644 --- a/store/gc.go +++ b/store/gc.go @@ -294,7 +294,7 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { wrec := wrapRecord(rec) recsize := wrec.rec.Payload.RecSize fileState.addRecord(recsize, isNewest, isDeleted, sizeBroken) - // logger.Infof("%v %v %v %v", ki.StringKey, isNewest, isCoverdByCollision, isDeleted) + //logger.Infof("key stat: %v %v %v %v", ki.StringKey, isNewest, isCoverdByCollision, isDeleted) if !isNewest { continue } diff --git a/store/hstore.go b/store/hstore.go index 6956974..dac7999 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "path/filepath" + "strconv" + "strings" "sync" "sync/atomic" "time" @@ -244,6 +246,17 @@ func (store *HStore) ListDir(ki *KeyInfo) ([]byte, error) { return store.ListUpper(ki) } +func (store *HStore) GCBuckets() string { + var result strings.Builder + store.gcMgr.mu.Lock() + for k := range store.gcMgr.stat { + result.WriteString(strconv.FormatInt(int64(k), 16)) + result.WriteString(",") + } + store.gcMgr.mu.Unlock() + return strings.TrimSuffix(result.String(), ",") +} + func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, pretend bool) (begin, end int, err error) { if bucketID >= Conf.NumBucket { err = fmt.Errorf("bad bucket id: %d", bucketID) diff --git a/store/hstore_test.go b/store/hstore_test.go index 2ab0fa2..63bbcf8 100644 --- a/store/hstore_test.go +++ b/store/hstore_test.go @@ -937,19 +937,11 @@ func testGCToLast(t *testing.T, store *HStore, bucketID, numRecPerFile int) { readfunc() } -func TestGCMultiBigBuffer(t *testing.T) { - testGC(t, testGCMulti, "multiBig", 10000) -} - func TestGCAfterRebuildHtree(t *testing.T) { testGC(t, testGCAfterRebuildHTree, "gc build htree", 1000) } -func TestGCMultiSmallBuffer(t *testing.T) { - GCWriteBufferSize = 256 - defer func() { - GCWriteBufferSize = GCWriteBufferSizeDefault - }() +func TestGCMultiBuffer(t *testing.T) { testGC(t, testGCMulti, "multiSmalll", 1000) } From 8a8bf89824fc224e6f5538a63147066ee57b3f23 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Wed, 13 Feb 2019 18:27:22 +0800 Subject: [PATCH 03/15] cancel gc graceful --- gobeansdb/web.go | 46 +++++++++++++++++------------- store/gc.go | 23 ++++++++++++++- store/hstore.go | 40 ++++++++++++++++++++++++-- store/hstore_test.go | 68 ++++++++++++++++++++++++++++---------------- 4 files changed, 129 insertions(+), 48 deletions(-) diff --git a/gobeansdb/web.go b/gobeansdb/web.go index 2ec5524..9405c62 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -10,7 +10,7 @@ import ( "runtime/debug" "strconv" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/douban/gobeansdb/cmem" "github.com/douban/gobeansdb/config" @@ -55,7 +55,6 @@ func init() { http.HandleFunc("/statgetset", handleStatGetSet) http.HandleFunc("/freememory", handleFreeMemory) - } func initWeb() { @@ -314,19 +313,6 @@ func handleGC(w http.ResponseWriter, r *http.Request) { var err error var bucketID int64 var pretend bool - defer func() { - if err != nil { - e := fmt.Sprintf("

err : %s

", err.Error()) - w.Write([]byte(e)) - showBucket(w, "gc") - } else { - if !pretend { - result2 := fmt.Sprintf(" /bucket/%d

", bucketID, bucketID) - result = result2 + result - } - w.Write([]byte(result)) - } - }() isQuery := getGCQuery(r) if isQuery { @@ -338,6 +324,8 @@ func handleGC(w http.ResponseWriter, r *http.Request) { if err != nil { return } + bktID := int(bucketID) + r.ParseForm() start, err := getFormValueInt(r, "start", -1) if err != nil { @@ -353,16 +341,36 @@ func handleGC(w http.ResponseWriter, r *http.Request) { } s := r.FormValue("run") - pretend = (s != "true") + pretend = s != "true" s = r.FormValue("merge") - merge := (s == "true") + merge := s == "true" + + s = r.FormValue("cancel") + gcCancel := s == "true" - start, end, err = storage.hstore.GC(int(bucketID), start, end, noGCDays, merge, pretend) - if err == nil { + if gcCancel { + result := storage.hstore.CancelGC(bktID) + fmt.Fprint(w, result) + return + } + + start, end, err = storage.hstore.GC(bktID, start, end, noGCDays, merge, pretend) + + if err != nil { + e := fmt.Sprintf("

err : %s

", err.Error()) + w.Write([]byte(e)) + showBucket(w, "gc") + } else { result = fmt.Sprintf("

bucket %d, start %d, end %d, merge %v, pretend %v

", bucketID, start, end, merge, pretend) + if !pretend { + result2 := fmt.Sprintf(" /bucket/%d

", bucketID, bucketID) + result = result2 + result + } + w.Write([]byte(result)) } + return } func handleDU(w http.ResponseWriter, r *http.Request) { diff --git a/store/gc.go b/store/gc.go index 58b9a3a..a29638d 100644 --- a/store/gc.go +++ b/store/gc.go @@ -1,6 +1,7 @@ package store import ( + "context" "fmt" "sync" "time" @@ -8,6 +9,11 @@ import ( "github.com/douban/gobeansdb/config" ) +type GCCancelCtx struct { + Cancel context.CancelFunc + ChunkChan chan int +} + type GCMgr struct { mu sync.RWMutex stat map[int]*GCState // map[bucketID]*GCState @@ -181,7 +187,7 @@ func (bkt *Bucket) gcCheckRange(startChunkID, endChunkID, noGCDays int) (start, return } -func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { +func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunkID, endChunkID int, merge bool) { logger.Infof("begin GC bucket %d chunk [%d, %d]", bkt.ID, startChunkID, endChunkID) @@ -229,6 +235,21 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { bkt.hints.trydump(gc.Dst, true) }() + go func() { + select { + case <-ctx.Done(): + tmp := gc.End + if gc.Src <= gc.End { + gc.End = gc.Src + } + logger.Infof("change bucket %d gc end to current [%d -> %d]", bkt.ID, tmp, gc.End) + ch <- gc.End + gcContextMap.rw.Lock() + delete(gcContextMap.m, bkt.ID) + gcContextMap.rw.Unlock() + } + }() + for gc.Src = gc.Begin; gc.Src <= gc.End; gc.Src++ { if bkt.datas.chunks[gc.Src].size <= 0 { logger.Infof("skip empty chunk %d", gc.Src) diff --git a/store/hstore.go b/store/hstore.go index dac7999..90aa206 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -2,6 +2,7 @@ package store import ( "bytes" + "context" "fmt" "os" "path/filepath" @@ -18,8 +19,14 @@ import ( ) var ( - logger = loghub.ErrorLogger - mergeChan chan int + logger = loghub.ErrorLogger + mergeChan chan int + gcLock sync.Mutex + gcContext = context.Background() + gcContextMap = struct { + m map[int]GCCancelCtx + rw sync.RWMutex + }{m: make(map[int]GCCancelCtx)} ) type HStore struct { @@ -288,7 +295,34 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, if pretend { return } - go store.gcMgr.gc(bkt, begin, end, merge) + + ctx, cancel := context.WithCancel(gcContext) + gcContextMap.rw.Lock() + gcContextMap.m[bucketID] = GCCancelCtx{Cancel: cancel, ChunkChan: make(chan int, 1)} + ch := gcContextMap.m[bucketID].ChunkChan + gcContextMap.rw.Unlock() + + go store.gcMgr.gc(ctx, ch, bkt, begin, end, merge) + return +} + +func (store *HStore) CancelGC(bucketID int) (result string) { + // prevent gc same bucket concurrent + gcLock.Lock() + defer gcLock.Unlock() + + // will delete key in goroutine + gcContextMap.rw.RLock() + gcCancelCtx, ok := gcContextMap.m[bucketID] + gcContextMap.rw.RUnlock() + + if ok { + gcCancelCtx.Cancel() + chunkID := <-gcCancelCtx.ChunkChan + result = fmt.Sprintf("cancel gc on bucket %d when gc chunk %d finished", bucketID, chunkID) + } else { + result = fmt.Sprintf("bucket %d not gcing", bucketID) + } return } diff --git a/store/hstore_test.go b/store/hstore_test.go index 63bbcf8..7f17958 100644 --- a/store/hstore_test.go +++ b/store/hstore_test.go @@ -2,8 +2,12 @@ package store import ( "bytes" + "context" "flag" "fmt" + "github.com/douban/gobeansdb/cmem" + "github.com/douban/gobeansdb/config" + "github.com/douban/gobeansdb/utils" "os" "path/filepath" "reflect" @@ -12,10 +16,6 @@ import ( "strings" "sync" "testing" - - "github.com/douban/gobeansdb/cmem" - "github.com/douban/gobeansdb/config" - "github.com/douban/gobeansdb/utils" ) var ( @@ -283,7 +283,7 @@ func checkDataSize(t *testing.T, ds *dataStore, sizes0 []uint32) { } } -func testGCUpdateSame(t *testing.T, store *HStore, bucketID, numRecPerFile int) { +func testGCUpdateSame(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo @@ -312,7 +312,7 @@ func testGCUpdateSame(t *testing.T, store *HStore, bucketID, numRecPerFile int) } store.flushdatas(true) bkt := store.buckets[bucketID] - store.gcMgr.gc(bkt, 0, 0, true) + store.gcMgr.gc(ctx, ch, bkt, 0, 0, true) for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 1) @@ -347,7 +347,7 @@ func testGCUpdateSame(t *testing.T, store *HStore, bucketID, numRecPerFile int) } } -func testGCNothing(t *testing.T, store *HStore, bucketID, numRecPerFile int) { +func testGCNothing(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo @@ -369,7 +369,7 @@ func testGCNothing(t *testing.T, store *HStore, bucketID, numRecPerFile int) { store.flushdatas(true) bkt := store.buckets[bucketID] - store.gcMgr.gc(bkt, 0, 0, true) + store.gcMgr.gc(ctx, ch, bkt, 0, 0, true) for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 0) payload2, pos, err := store.Get(&ki, false) @@ -404,7 +404,7 @@ func testGCNothing(t *testing.T, store *HStore, bucketID, numRecPerFile int) { } } -func testGCAfterRebuildHTree(t *testing.T, store *HStore, bucketID, numRecPerFile int) { +func testGCAfterRebuildHTree(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -519,7 +519,7 @@ func testGCAfterRebuildHTree(t *testing.T, store *HStore, bucketID, numRecPerFil // GC bkt = store.buckets[bucketID] - store.gcMgr.gc(bkt, 1, 3, true) + store.gcMgr.gc(ctx, ch, bkt, 1, 3, true) dir = utils.NewDir() dir.Set("000.data", int64(n)) dir.Set("000.000.idx.s", -1) @@ -567,7 +567,7 @@ func testGCAfterRebuildHTree(t *testing.T, store *HStore, bucketID, numRecPerFil // GC begin with 0 bkt = store.buckets[bucketID] - store.gcMgr.gc(bkt, 0, 1, true) + store.gcMgr.gc(ctx, ch, bkt, 0, 1, true) stat := bkt.GCHistory[len(bkt.GCHistory)-1] if stat.Begin > 0 { t.Fatalf("Begin 0") @@ -584,7 +584,7 @@ func testGCAfterRebuildHTree(t *testing.T, store *HStore, bucketID, numRecPerFil checkFiles(t, bkt.Home, dir) } -func testGCDeleteSame(t *testing.T, store *HStore, bucketID, numRecPerFile int) { +func testGCDeleteSame(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo @@ -624,7 +624,7 @@ func testGCDeleteSame(t *testing.T, store *HStore, bucketID, numRecPerFile int) // dir.Set("collision.yaml", -1) checkFiles(t, bkt.Home, dir) - store.gcMgr.gc(bkt, 0, 0, true) + store.gcMgr.gc(ctx, ch, bkt, 0, 0, true) for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 1) payload2, pos, err := store.Get(&ki, false) @@ -680,7 +680,7 @@ func readHStore(t *testing.T, store *HStore, n, v int) { } } -func testGCMulti(t *testing.T, store *HStore, bucketID, numRecPerFile int) { +func testGCMulti(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -786,7 +786,7 @@ func testGCMulti(t *testing.T, store *HStore, bucketID, numRecPerFile int) { // 002: [n/2 , n) // 004: -1 - store.gcMgr.gc(bkt, 1, 3, true) + store.gcMgr.gc(ctx, ch, bkt, 1, 3, true) stop = true readfunc() @@ -839,7 +839,7 @@ func testGCMulti(t *testing.T, store *HStore, bucketID, numRecPerFile int) { readfunc() } -func testGCToLast(t *testing.T, store *HStore, bucketID, numRecPerFile int) { +func testGCToLast(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -887,7 +887,7 @@ func testGCToLast(t *testing.T, store *HStore, bucketID, numRecPerFile int) { t.Fatal(err) } store.flushdatas(true) - store.gcMgr.gc(bkt, 1, 1, true) + store.gcMgr.gc(ctx, ch, bkt, 1, 1, true) readfunc := func() { for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 1) @@ -961,13 +961,19 @@ func TestGCDeleteSame(t *testing.T) { testGC(t, testGCDeleteSame, "deleteSame", 100) } -type testGCFunc func(t *testing.T, hstore *HStore, bucket, numRecPerFile int) +type testGCFunc func(t *testing.T, ctx context.Context, ch chan<- int, hstore *HStore, bucket, numRecPerFile int) // numRecPerFile should be even func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) { setupTest(fmt.Sprintf("testGC_%s", name)) + ctx, cancel := context.WithCancel(gcContext) + chunkCh := make(chan int, 1) defer clearTest() + defer func() { + cancel() + <-chunkCh + }() numbucket := 16 bkt := numbucket - 1 @@ -992,7 +998,7 @@ func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) { t.Fatal(err) } logger.Infof("%#v", Conf) - casefunc(t, store, bkt, numRecPerFile) + casefunc(t, ctx, chunkCh, store, bkt, numRecPerFile) if !cmem.DBRL.IsZero() { t.Fatalf("%#v", cmem.DBRL) @@ -1007,7 +1013,13 @@ func TestGCConcurrency(t *testing.T) { func testGCConcurrency(t *testing.T, numRecPerFile int) { setupTest(fmt.Sprintf("testGC_%s", "testGCConcurrency")) + ctx, cancel := context.WithCancel(gcContext) + chunkCh := make(chan int, 1) defer clearTest() + defer func() { + cancel() + <-chunkCh + }() numbucket := 16 Conf.NumBucket = numbucket @@ -1037,7 +1049,7 @@ func testGCConcurrency(t *testing.T, numRecPerFile int) { t.Fatal(err) } logger.Infof("%#v", Conf) - execConcurrencyTest(t, store, bucketIDs, numRecPerFile) + execConcurrencyTest(t, ctx, chunkCh, store, bucketIDs, numRecPerFile) if !cmem.DBRL.IsZero() { t.Fatalf("%#v", cmem.DBRL) @@ -1049,7 +1061,7 @@ func testGCConcurrency(t *testing.T, numRecPerFile int) { } } -func execConcurrencyTest(t *testing.T, store *HStore, bucketIDs []int, numRecPerFile int) { +func execConcurrencyTest(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketIDs []int, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -1166,7 +1178,7 @@ func execConcurrencyTest(t *testing.T, store *HStore, bucketIDs []int, numRecPer var wg sync.WaitGroup gcExec := func(wgg *sync.WaitGroup, b *Bucket) { defer wgg.Done() - store.gcMgr.gc(b, 1, 3, true) + store.gcMgr.gc(ctx, ch, b, 1, 3, true) } for _, bucketID := range bucketIDs { wg.Add(1) @@ -1318,7 +1330,7 @@ func checkAllDataWithHints(dir string) error { return nil } -func testGCReserveDelete(t *testing.T, store *HStore, bucketID, numRecPerFile int) { +func testGCReserveDelete(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo logger.Infof("test gc all updated in the same file") @@ -1354,7 +1366,7 @@ func testGCReserveDelete(t *testing.T, store *HStore, bucketID, numRecPerFile in store.flushdatas(true) bkt := store.buckets[bucketID] - store.gcMgr.gc(bkt, 1, 1, true) + store.gcMgr.gc(ctx, ch, bkt, 1, 1, true) gen.gen(&ki, 0, 0) payload2, _, err := store.Get(&ki, false) @@ -1412,7 +1424,13 @@ func TestHStoreCollision(t *testing.T) { func TestChunk256(t *testing.T) { Conf.InitDefault() setupTest("TestChunk256") + ctx, cancel := context.WithCancel(gcContext) + ch := make(chan int, 1) defer clearTest() + defer func() { + cancel() + <-ch + }() numbucket := 16 bucketID := numbucket - 1 @@ -1475,7 +1493,7 @@ func TestChunk256(t *testing.T) { } } bkt := store.buckets[bucketID] - store.gcMgr.gc(bkt, 0, N-1, true) + store.gcMgr.gc(ctx, ch, bkt, 0, N-1, true) } store.Close() } From 17378b05854ae8d18e708e5a6585fef7923d59dc Mon Sep 17 00:00:00 2001 From: ZHAO Date: Mon, 22 Apr 2019 16:56:58 +0800 Subject: [PATCH 04/15] cancel gc --- gobeansdb/web.go | 9 ++- store/data.go | 2 + store/gc.go | 167 +++++++++++++++++++++---------------------- store/hstore.go | 16 ++--- store/hstore_test.go | 19 ++--- 5 files changed, 103 insertions(+), 110 deletions(-) diff --git a/gobeansdb/web.go b/gobeansdb/web.go index 9405c62..755a67f 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -317,6 +317,7 @@ func handleGC(w http.ResponseWriter, r *http.Request) { isQuery := getGCQuery(r) if isQuery { result = storage.hstore.GCBuckets() + w.Write([]byte(result)) return } @@ -350,7 +351,13 @@ func handleGC(w http.ResponseWriter, r *http.Request) { gcCancel := s == "true" if gcCancel { - result := storage.hstore.CancelGC(bktID) + var result string + bktID, chunkID := storage.hstore.CancelGC(bktID) + if chunkID == -1 { + result = fmt.Sprintf("bucket %d is not gcing", bktID) + } else { + result = fmt.Sprintf("cancel gc on bucket %d, chunk: %d", bktID, chunkID) + } fmt.Fprint(w, result) return } diff --git a/store/data.go b/store/data.go index d5a6de7..78a348f 100644 --- a/store/data.go +++ b/store/data.go @@ -132,7 +132,9 @@ func (ds *dataStore) flush(chunk int, force bool) error { filessize, w.offset, ds.genPath(chunk), &ds.chunks[chunk]) } nflushed, err := ds.chunks[chunk].flush(w, false) + ds.Lock() ds.wbufSize -= nflushed + ds.Unlock() w.Close() return nil diff --git a/store/gc.go b/store/gc.go index a29638d..defc480 100644 --- a/store/gc.go +++ b/store/gc.go @@ -233,21 +233,9 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk defer func() { dstchunk.endGCWriting() bkt.hints.trydump(gc.Dst, true) - }() - - go func() { - select { - case <-ctx.Done(): - tmp := gc.End - if gc.Src <= gc.End { - gc.End = gc.Src - } - logger.Infof("change bucket %d gc end to current [%d -> %d]", bkt.ID, tmp, gc.End) - ch <- gc.End - gcContextMap.rw.Lock() - delete(gcContextMap.m, bkt.ID) - gcContextMap.rw.Unlock() - } + gcContextMap.rw.Lock() + delete(gcContextMap.m, bkt.ID) + gcContextMap.rw.Unlock() }() for gc.Src = gc.Begin; gc.Src <= gc.End; gc.Src++ { @@ -265,91 +253,98 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk logger.Errorf("gc failed: %s", err.Error()) return } - +READRECORD: for { - var sizeBroken uint32 - rec, oldPos.Offset, sizeBroken, err = r.Next() - if err != nil { - gc.Err = err - logger.Errorf("gc failed: %s", err.Error()) + select { + case <-ctx.Done(): + logger.Infof("cancel gc, src: %d, dst: %d", gc.Src, gc.Dst) + ch <- gc.Src return - } - if rec == nil { - break - } + default: + var sizeBroken uint32 + rec, oldPos.Offset, sizeBroken, err = r.Next() + if err != nil { + gc.Err = err + logger.Errorf("gc failed: %s", err.Error()) + return + } + if rec == nil { + break READRECORD + } - var isNewest, isCoverdByCollision, isDeleted bool - meta := rec.Payload.Meta - ki := NewKeyInfoFromBytes(rec.Key, getKeyHash(rec.Key), false) - treeMeta, treePos, found := bkt.htree.get(ki) - if found { - if oldPos == treePos { // easy - meta.ValueHash = treeMeta.ValueHash - isNewest = true - } else { - isDeleted = treeMeta.Ver < 0 - hintit, hintchunkid, isCoverdByCollision := bkt.hints.getCollisionGC(ki) - if isCoverdByCollision { - if hintit != nil { - p := Position{hintchunkid, hintit.Pos.Offset} - if p == oldPos { - isNewest = true - meta.ValueHash = hintit.Vhash + var isNewest, isCoverdByCollision, isDeleted bool + meta := rec.Payload.Meta + ki := NewKeyInfoFromBytes(rec.Key, getKeyHash(rec.Key), false) + treeMeta, treePos, found := bkt.htree.get(ki) + if found { + if oldPos == treePos { // easy + meta.ValueHash = treeMeta.ValueHash + isNewest = true + } else { + isDeleted = treeMeta.Ver < 0 + hintit, hintchunkid, isCoverdByCollision := bkt.hints.getCollisionGC(ki) + if isCoverdByCollision { + if hintit != nil { + p := Position{hintchunkid, hintit.Pos.Offset} + if p == oldPos { + isNewest = true + meta.ValueHash = hintit.Vhash + } + } else { + isNewest = true // guess + meta.ValueHash = rec.Payload.Getvhash() } - } else { - isNewest = true // guess - meta.ValueHash = rec.Payload.Getvhash() } } + } else { + // when rebuiding the HTree, the deleted recs are removed from HTree + // we are not sure whether the `set rec` is still in datafiles while `auto GC`, so we need to write a copy of `del rec` in datafile. + // but we can remove the `del rec` while GC begin with 0 + fileState.NumNotInHtree++ + if gc.Begin > 0 && rec.Payload.Ver < 0 { + isNewest = true + } } - } else { - // when rebuiding the HTree, the deleted recs are removed from HTree - // we are not sure whether the `set rec` is still in datafiles while `auto GC`, so we need to write a copy of `del rec` in datafile. - // but we can remove the `del rec` while GC begin with 0 - fileState.NumNotInHtree++ - if gc.Begin > 0 && rec.Payload.Ver < 0 { - isNewest = true - } - } - - wrec := wrapRecord(rec) - recsize := wrec.rec.Payload.RecSize - fileState.addRecord(recsize, isNewest, isDeleted, sizeBroken) - //logger.Infof("key stat: %v %v %v %v", ki.StringKey, isNewest, isCoverdByCollision, isDeleted) - if !isNewest { - continue - } - if recsize+dstchunk.writingHead > uint32(Conf.DataFileMax) { - dstchunk.endGCWriting() - bkt.hints.trydump(gc.Dst, true) + wrec := wrapRecord(rec) + recsize := wrec.rec.Payload.RecSize + fileState.addRecord(recsize, isNewest, isDeleted, sizeBroken) + //logger.Infof("key stat: %v %v %v %v", ki.StringKey, isNewest, isCoverdByCollision, isDeleted) + if !isNewest { + continue READRECORD + } - gc.Dst++ - newPos.ChunkID = gc.Dst - logger.Infof("continue GC bucket %d, file %d -> %d", bkt.ID, gc.Src, gc.Dst) - dstchunk = &bkt.datas.chunks[gc.Dst] - err = dstchunk.beginGCWriting(gc.Src) - if err != nil { + if recsize+dstchunk.writingHead > uint32(Conf.DataFileMax) { + dstchunk.endGCWriting() + bkt.hints.trydump(gc.Dst, true) + + gc.Dst++ + newPos.ChunkID = gc.Dst + logger.Infof("continue GC bucket %d, file %d -> %d", bkt.ID, gc.Src, gc.Dst) + dstchunk = &bkt.datas.chunks[gc.Dst] + err = dstchunk.beginGCWriting(gc.Src) + if err != nil { + gc.Err = err + return + } + } + if newPos.Offset, err = dstchunk.AppendRecordGC(wrec); err != nil { gc.Err = err + logger.Errorf("gc failed: %s", err.Error()) return } - } - if newPos.Offset, err = dstchunk.AppendRecordGC(wrec); err != nil { - gc.Err = err - logger.Errorf("gc failed: %s", err.Error()) - return - } - // logger.Infof("%s %v %v", ki.StringKey, newPos, meta) - if found { - if isCoverdByCollision { - mgr.UpdateCollision(bkt, ki, oldPos, newPos, rec) + // logger.Infof("%s %v %v", ki.StringKey, newPos, meta) + if found { + if isCoverdByCollision { + mgr.UpdateCollision(bkt, ki, oldPos, newPos, rec) + } + mgr.UpdateHtreePos(bkt, ki, oldPos, newPos) } - mgr.UpdateHtreePos(bkt, ki, oldPos, newPos) - } - rotated := bkt.hints.set(ki, &meta, newPos, recsize, "gc") - if rotated { - bkt.hints.trydump(gc.Dst, false) + rotated := bkt.hints.set(ki, &meta, newPos, recsize, "gc") + if rotated { + bkt.hints.trydump(gc.Dst, false) + } } } diff --git a/store/hstore.go b/store/hstore.go index 90aa206..6210e32 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -306,22 +306,22 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, return } -func (store *HStore) CancelGC(bucketID int) (result string) { - // prevent gc same bucket concurrent +func (store *HStore) CancelGC(bucketID int) (bktID, chunkID int) { + // prevent same bucket concurrent request gcLock.Lock() defer gcLock.Unlock() - // will delete key in goroutine + // will delete key at goroutine in store/gc.go gcContextMap.rw.RLock() - gcCancelCtx, ok := gcContextMap.m[bucketID] + bktGCCancelCtx, ok := gcContextMap.m[bucketID] gcContextMap.rw.RUnlock() + bktID = bucketID if ok { - gcCancelCtx.Cancel() - chunkID := <-gcCancelCtx.ChunkChan - result = fmt.Sprintf("cancel gc on bucket %d when gc chunk %d finished", bucketID, chunkID) + bktGCCancelCtx.Cancel() + chunkID = <-bktGCCancelCtx.ChunkChan } else { - result = fmt.Sprintf("bucket %d not gcing", bucketID) + chunkID = -1 } return } diff --git a/store/hstore_test.go b/store/hstore_test.go index 7f17958..797a687 100644 --- a/store/hstore_test.go +++ b/store/hstore_test.go @@ -967,13 +967,9 @@ type testGCFunc func(t *testing.T, ctx context.Context, ch chan<- int, hstore *H func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) { setupTest(fmt.Sprintf("testGC_%s", name)) - ctx, cancel := context.WithCancel(gcContext) + ctx, _ := context.WithCancel(gcContext) chunkCh := make(chan int, 1) defer clearTest() - defer func() { - cancel() - <-chunkCh - }() numbucket := 16 bkt := numbucket - 1 @@ -1007,19 +1003,16 @@ func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) { store.Close() checkAllDataWithHints(bucketDir) } + func TestGCConcurrency(t *testing.T) { testGCConcurrency(t, 10000) } func testGCConcurrency(t *testing.T, numRecPerFile int) { setupTest(fmt.Sprintf("testGC_%s", "testGCConcurrency")) - ctx, cancel := context.WithCancel(gcContext) + ctx, _ := context.WithCancel(gcContext) chunkCh := make(chan int, 1) defer clearTest() - defer func() { - cancel() - <-chunkCh - }() numbucket := 16 Conf.NumBucket = numbucket @@ -1424,13 +1417,9 @@ func TestHStoreCollision(t *testing.T) { func TestChunk256(t *testing.T) { Conf.InitDefault() setupTest("TestChunk256") - ctx, cancel := context.WithCancel(gcContext) + ctx, _ := context.WithCancel(gcContext) ch := make(chan int, 1) defer clearTest() - defer func() { - cancel() - <-ch - }() numbucket := 16 bucketID := numbucket - 1 From 69ae52cd588b463df6ebad7af53b8b7b08fef651 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Mon, 22 Apr 2019 17:50:00 +0800 Subject: [PATCH 05/15] cancel gc --- store/gc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/store/gc.go b/store/gc.go index defc480..bfdafa7 100644 --- a/store/gc.go +++ b/store/gc.go @@ -202,6 +202,9 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk mgr.mu.Lock() delete(mgr.stat, bkt.ID) mgr.mu.Unlock() + gcContextMap.rw.Lock() + delete(gcContextMap.m, bkt.ID) + gcContextMap.rw.Unlock() gc.EndTS = time.Now() }() gc.Begin = startChunkID @@ -233,9 +236,6 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk defer func() { dstchunk.endGCWriting() bkt.hints.trydump(gc.Dst, true) - gcContextMap.rw.Lock() - delete(gcContextMap.m, bkt.ID) - gcContextMap.rw.Unlock() }() for gc.Src = gc.Begin; gc.Src <= gc.End; gc.Src++ { @@ -253,7 +253,7 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk logger.Errorf("gc failed: %s", err.Error()) return } -READRECORD: + READRECORD: for { select { case <-ctx.Done(): From 67869feae5e2e67b2eea8f866288cd08ea1f3f40 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Mon, 22 Apr 2019 17:53:26 +0800 Subject: [PATCH 06/15] cancel gc --- store/hstore_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/store/hstore_test.go b/store/hstore_test.go index 797a687..91e391b 100644 --- a/store/hstore_test.go +++ b/store/hstore_test.go @@ -5,9 +5,6 @@ import ( "context" "flag" "fmt" - "github.com/douban/gobeansdb/cmem" - "github.com/douban/gobeansdb/config" - "github.com/douban/gobeansdb/utils" "os" "path/filepath" "reflect" @@ -16,6 +13,10 @@ import ( "strings" "sync" "testing" + + "github.com/douban/gobeansdb/cmem" + "github.com/douban/gobeansdb/config" + "github.com/douban/gobeansdb/utils" ) var ( From 74a33a4568670f8d57b078d078c7b8305b810b36 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Tue, 23 Apr 2019 11:09:04 +0800 Subject: [PATCH 07/15] cancel gc --- gobeansdb/web.go | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/gobeansdb/web.go b/gobeansdb/web.go index 755a67f..0ed6417 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -317,17 +317,34 @@ func handleGC(w http.ResponseWriter, r *http.Request) { isQuery := getGCQuery(r) if isQuery { result = storage.hstore.GCBuckets() - w.Write([]byte(result)) + w.Write([]byte("gcing: " + result)) return } + defer func() { + if err != nil { + e := fmt.Sprintf("

err : %s

", err.Error()) + w.Write([]byte(e)) + showBucket(w, "gc") + } else { + if !pretend { + result2 := fmt.Sprintf(" /bucket/%d

", bucketID, bucketID) + result = result2 + result + } + w.Write([]byte(result)) + } + }() + bucketID, err = getBucket(r) if err != nil { return } bktID := int(bucketID) - r.ParseForm() + err = r.ParseForm() + if err != nil { + return + } start, err := getFormValueInt(r, "start", -1) if err != nil { return @@ -351,31 +368,16 @@ func handleGC(w http.ResponseWriter, r *http.Request) { gcCancel := s == "true" if gcCancel { - var result string bktID, chunkID := storage.hstore.CancelGC(bktID) if chunkID == -1 { result = fmt.Sprintf("bucket %d is not gcing", bktID) } else { result = fmt.Sprintf("cancel gc on bucket %d, chunk: %d", bktID, chunkID) } - fmt.Fprint(w, result) - return - } - - start, end, err = storage.hstore.GC(bktID, start, end, noGCDays, merge, pretend) - - if err != nil { - e := fmt.Sprintf("

err : %s

", err.Error()) - w.Write([]byte(e)) - showBucket(w, "gc") } else { + start, end, err = storage.hstore.GC(bktID, start, end, noGCDays, merge, pretend) result = fmt.Sprintf("

bucket %d, start %d, end %d, merge %v, pretend %v

", bucketID, start, end, merge, pretend) - if !pretend { - result2 := fmt.Sprintf(" /bucket/%d

", bucketID, bucketID) - result = result2 + result - } - w.Write([]byte(result)) } return } From 3fde8064bc2ec3679dd7995e80ad130ea2ca7091 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Tue, 23 Apr 2019 15:00:39 +0800 Subject: [PATCH 08/15] cancel gc --- gobeansdb/web.go | 7 ++- store/gc.go | 154 ++++++++++++++++++++++++----------------------- store/hstore.go | 14 ++--- 3 files changed, 90 insertions(+), 85 deletions(-) diff --git a/gobeansdb/web.go b/gobeansdb/web.go index 0ed6417..cbcd9af 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -316,8 +316,9 @@ func handleGC(w http.ResponseWriter, r *http.Request) { isQuery := getGCQuery(r) if isQuery { - result = storage.hstore.GCBuckets() - w.Write([]byte("gcing: " + result)) + gcResult := storage.hstore.GCBuckets() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(gcResult) return } @@ -368,7 +369,7 @@ func handleGC(w http.ResponseWriter, r *http.Request) { gcCancel := s == "true" if gcCancel { - bktID, chunkID := storage.hstore.CancelGC(bktID) + chunkID := storage.hstore.CancelGC(bktID) if chunkID == -1 { result = fmt.Sprintf("bucket %d is not gcing", bktID) } else { diff --git a/store/gc.go b/store/gc.go index bfdafa7..fdd8688 100644 --- a/store/gc.go +++ b/store/gc.go @@ -203,7 +203,10 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk delete(mgr.stat, bkt.ID) mgr.mu.Unlock() gcContextMap.rw.Lock() - delete(gcContextMap.m, bkt.ID) + _, exist := gcContextMap.m[bkt.ID] + if exist { + delete(gcContextMap.m, bkt.ID) + } gcContextMap.rw.Unlock() gc.EndTS = time.Now() }() @@ -253,98 +256,101 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk logger.Errorf("gc failed: %s", err.Error()) return } - READRECORD: + for { select { case <-ctx.Done(): logger.Infof("cancel gc, src: %d, dst: %d", gc.Src, gc.Dst) + gcContextMap.rw.Lock() + delete(gcContextMap.m, bkt.ID) + gcContextMap.rw.Unlock() ch <- gc.Src return default: - var sizeBroken uint32 - rec, oldPos.Offset, sizeBroken, err = r.Next() - if err != nil { - gc.Err = err - logger.Errorf("gc failed: %s", err.Error()) - return - } - if rec == nil { - break READRECORD - } + } + var sizeBroken uint32 + rec, oldPos.Offset, sizeBroken, err = r.Next() + if err != nil { + gc.Err = err + logger.Errorf("gc failed: %s", err.Error()) + return + } + if rec == nil { + break + } - var isNewest, isCoverdByCollision, isDeleted bool - meta := rec.Payload.Meta - ki := NewKeyInfoFromBytes(rec.Key, getKeyHash(rec.Key), false) - treeMeta, treePos, found := bkt.htree.get(ki) - if found { - if oldPos == treePos { // easy - meta.ValueHash = treeMeta.ValueHash - isNewest = true - } else { - isDeleted = treeMeta.Ver < 0 - hintit, hintchunkid, isCoverdByCollision := bkt.hints.getCollisionGC(ki) - if isCoverdByCollision { - if hintit != nil { - p := Position{hintchunkid, hintit.Pos.Offset} - if p == oldPos { - isNewest = true - meta.ValueHash = hintit.Vhash - } - } else { - isNewest = true // guess - meta.ValueHash = rec.Payload.Getvhash() + var isNewest, isCoverdByCollision, isDeleted bool + meta := rec.Payload.Meta + ki := NewKeyInfoFromBytes(rec.Key, getKeyHash(rec.Key), false) + treeMeta, treePos, found := bkt.htree.get(ki) + if found { + if oldPos == treePos { // easy + meta.ValueHash = treeMeta.ValueHash + isNewest = true + } else { + isDeleted = treeMeta.Ver < 0 + hintit, hintchunkid, isCoverdByCollision := bkt.hints.getCollisionGC(ki) + if isCoverdByCollision { + if hintit != nil { + p := Position{hintchunkid, hintit.Pos.Offset} + if p == oldPos { + isNewest = true + meta.ValueHash = hintit.Vhash } + } else { + isNewest = true // guess + meta.ValueHash = rec.Payload.Getvhash() } } - } else { - // when rebuiding the HTree, the deleted recs are removed from HTree - // we are not sure whether the `set rec` is still in datafiles while `auto GC`, so we need to write a copy of `del rec` in datafile. - // but we can remove the `del rec` while GC begin with 0 - fileState.NumNotInHtree++ - if gc.Begin > 0 && rec.Payload.Ver < 0 { - isNewest = true - } } - - wrec := wrapRecord(rec) - recsize := wrec.rec.Payload.RecSize - fileState.addRecord(recsize, isNewest, isDeleted, sizeBroken) - //logger.Infof("key stat: %v %v %v %v", ki.StringKey, isNewest, isCoverdByCollision, isDeleted) - if !isNewest { - continue READRECORD + } else { + // when rebuiding the HTree, the deleted recs are removed from HTree + // we are not sure whether the `set rec` is still in datafiles while `auto GC`, so we need to write a copy of `del rec` in datafile. + // but we can remove the `del rec` while GC begin with 0 + fileState.NumNotInHtree++ + if gc.Begin > 0 && rec.Payload.Ver < 0 { + isNewest = true } + } - if recsize+dstchunk.writingHead > uint32(Conf.DataFileMax) { - dstchunk.endGCWriting() - bkt.hints.trydump(gc.Dst, true) - - gc.Dst++ - newPos.ChunkID = gc.Dst - logger.Infof("continue GC bucket %d, file %d -> %d", bkt.ID, gc.Src, gc.Dst) - dstchunk = &bkt.datas.chunks[gc.Dst] - err = dstchunk.beginGCWriting(gc.Src) - if err != nil { - gc.Err = err - return - } - } - if newPos.Offset, err = dstchunk.AppendRecordGC(wrec); err != nil { + wrec := wrapRecord(rec) + recsize := wrec.rec.Payload.RecSize + fileState.addRecord(recsize, isNewest, isDeleted, sizeBroken) + //logger.Infof("key stat: %v %v %v %v", ki.StringKey, isNewest, isCoverdByCollision, isDeleted) + if !isNewest { + continue + } + + if recsize+dstchunk.writingHead > uint32(Conf.DataFileMax) { + dstchunk.endGCWriting() + bkt.hints.trydump(gc.Dst, true) + + gc.Dst++ + newPos.ChunkID = gc.Dst + logger.Infof("continue GC bucket %d, file %d -> %d", bkt.ID, gc.Src, gc.Dst) + dstchunk = &bkt.datas.chunks[gc.Dst] + err = dstchunk.beginGCWriting(gc.Src) + if err != nil { gc.Err = err - logger.Errorf("gc failed: %s", err.Error()) return } - // logger.Infof("%s %v %v", ki.StringKey, newPos, meta) - if found { - if isCoverdByCollision { - mgr.UpdateCollision(bkt, ki, oldPos, newPos, rec) - } - mgr.UpdateHtreePos(bkt, ki, oldPos, newPos) + } + if newPos.Offset, err = dstchunk.AppendRecordGC(wrec); err != nil { + gc.Err = err + logger.Errorf("gc failed: %s", err.Error()) + return + } + // logger.Infof("%s %v %v", ki.StringKey, newPos, meta) + if found { + if isCoverdByCollision { + mgr.UpdateCollision(bkt, ki, oldPos, newPos, rec) } + mgr.UpdateHtreePos(bkt, ki, oldPos, newPos) + } - rotated := bkt.hints.set(ki, &meta, newPos, recsize, "gc") - if rotated { - bkt.hints.trydump(gc.Dst, false) - } + rotated := bkt.hints.set(ki, &meta, newPos, recsize, "gc") + if rotated { + bkt.hints.trydump(gc.Dst, false) } } diff --git a/store/hstore.go b/store/hstore.go index 6210e32..737b738 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -7,7 +7,6 @@ import ( "os" "path/filepath" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -253,15 +252,15 @@ func (store *HStore) ListDir(ki *KeyInfo) ([]byte, error) { return store.ListUpper(ki) } -func (store *HStore) GCBuckets() string { - var result strings.Builder +func (store *HStore) GCBuckets() []string { + store.gcMgr.mu.Lock() + result := make([]string, 0, len(store.gcMgr.stat)) for k := range store.gcMgr.stat { - result.WriteString(strconv.FormatInt(int64(k), 16)) - result.WriteString(",") + result = append(result, strconv.FormatInt(int64(k), 16)) } store.gcMgr.mu.Unlock() - return strings.TrimSuffix(result.String(), ",") + return result } func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, pretend bool) (begin, end int, err error) { @@ -306,7 +305,7 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, return } -func (store *HStore) CancelGC(bucketID int) (bktID, chunkID int) { +func (store *HStore) CancelGC(bucketID int) (chunkID int) { // prevent same bucket concurrent request gcLock.Lock() defer gcLock.Unlock() @@ -316,7 +315,6 @@ func (store *HStore) CancelGC(bucketID int) (bktID, chunkID int) { bktGCCancelCtx, ok := gcContextMap.m[bucketID] gcContextMap.rw.RUnlock() - bktID = bucketID if ok { bktGCCancelCtx.Cancel() chunkID = <-bktGCCancelCtx.ChunkChan From 3059db01f468c2b31d8b55b1b28841bda870468b Mon Sep 17 00:00:00 2001 From: ZHAO Date: Tue, 23 Apr 2019 15:05:41 +0800 Subject: [PATCH 09/15] cancel gc --- store/gc.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/store/gc.go b/store/gc.go index fdd8688..4278463 100644 --- a/store/gc.go +++ b/store/gc.go @@ -203,10 +203,7 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk delete(mgr.stat, bkt.ID) mgr.mu.Unlock() gcContextMap.rw.Lock() - _, exist := gcContextMap.m[bkt.ID] - if exist { - delete(gcContextMap.m, bkt.ID) - } + delete(gcContextMap.m, bkt.ID) gcContextMap.rw.Unlock() gc.EndTS = time.Now() }() From 8adefd28a29d689064006fe022ff80eca6518e3a Mon Sep 17 00:00:00 2001 From: ZHAO Date: Tue, 23 Apr 2019 18:09:34 +0800 Subject: [PATCH 10/15] fix cancel gc --- gobeansdb/web.go | 6 +++--- store/gc.go | 38 ++++++++++++++++++---------------- store/hstore.go | 36 +++++++++++--------------------- store/hstore_test.go | 49 +++++++++++++++++++------------------------- 4 files changed, 56 insertions(+), 73 deletions(-) diff --git a/gobeansdb/web.go b/gobeansdb/web.go index cbcd9af..4376299 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -369,11 +369,11 @@ func handleGC(w http.ResponseWriter, r *http.Request) { gcCancel := s == "true" if gcCancel { - chunkID := storage.hstore.CancelGC(bktID) - if chunkID == -1 { + src, dst := storage.hstore.CancelGC(bktID) + if src == -1 { result = fmt.Sprintf("bucket %d is not gcing", bktID) } else { - result = fmt.Sprintf("cancel gc on bucket %d, chunk: %d", bktID, chunkID) + result = fmt.Sprintf("cancel gc on bucket %d, src: %d -> dst: %d", bktID, src, dst) } } else { start, end, err = storage.hstore.GC(bktID, start, end, noGCDays, merge, pretend) diff --git a/store/gc.go b/store/gc.go index 4278463..9fcbff2 100644 --- a/store/gc.go +++ b/store/gc.go @@ -10,7 +10,7 @@ import ( ) type GCCancelCtx struct { - Cancel context.CancelFunc + Ctx context.Context ChunkChan chan int } @@ -32,6 +32,7 @@ type GCState struct { Dst int Err error + CancelFlag bool // sum GCFileState } @@ -187,7 +188,7 @@ func (bkt *Bucket) gcCheckRange(startChunkID, endChunkID, noGCDays int) (start, return } -func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunkID, endChunkID int, merge bool) { +func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { logger.Infof("begin GC bucket %d chunk [%d, %d]", bkt.ID, startChunkID, endChunkID) @@ -202,9 +203,6 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk mgr.mu.Lock() delete(mgr.stat, bkt.ID) mgr.mu.Unlock() - gcContextMap.rw.Lock() - delete(gcContextMap.m, bkt.ID) - gcContextMap.rw.Unlock() gc.EndTS = time.Now() }() gc.Begin = startChunkID @@ -219,10 +217,20 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk defer mgr.AfterBucket(bkt) gc.Dst = startChunkID - for i := 0; i < startChunkID; i++ { + // try to find the nearest chunk that small than start chunk + for i := startChunkID - 1; i >= 0; i-- { sz := bkt.datas.chunks[i].size - if sz > 0 && (int64(sz) < Conf.DataFileMax-config.MCConf.BodyMax) { - gc.Dst = i + if sz > 0 { + if int64(sz) < Conf.DataFileMax-config.MCConf.BodyMax { + // previous one available and not full, use it. + gc.Dst = i + break + } else { + if i < startChunkID-1 { // not previous one + gc.Dst = i + 1 + } + break + } } } @@ -239,6 +247,10 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk }() for gc.Src = gc.Begin; gc.Src <= gc.End; gc.Src++ { + if gc.CancelFlag { + logger.Infof("GC canceled: src %d dst %d", gc.Src, gc.Dst) + return + } if bkt.datas.chunks[gc.Src].size <= 0 { logger.Infof("skip empty chunk %d", gc.Src) continue @@ -255,16 +267,6 @@ func (mgr *GCMgr) gc(ctx context.Context, ch chan<- int, bkt *Bucket, startChunk } for { - select { - case <-ctx.Done(): - logger.Infof("cancel gc, src: %d, dst: %d", gc.Src, gc.Dst) - gcContextMap.rw.Lock() - delete(gcContextMap.m, bkt.ID) - gcContextMap.rw.Unlock() - ch <- gc.Src - return - default: - } var sizeBroken uint32 rec, oldPos.Offset, sizeBroken, err = r.Next() if err != nil { diff --git a/store/hstore.go b/store/hstore.go index 737b738..553478c 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -2,7 +2,6 @@ package store import ( "bytes" - "context" "fmt" "os" "path/filepath" @@ -18,14 +17,9 @@ import ( ) var ( - logger = loghub.ErrorLogger - mergeChan chan int - gcLock sync.Mutex - gcContext = context.Background() - gcContextMap = struct { - m map[int]GCCancelCtx - rw sync.RWMutex - }{m: make(map[int]GCCancelCtx)} + logger = loghub.ErrorLogger + mergeChan chan int + gcLock sync.Mutex ) type HStore struct { @@ -295,31 +289,25 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, return } - ctx, cancel := context.WithCancel(gcContext) - gcContextMap.rw.Lock() - gcContextMap.m[bucketID] = GCCancelCtx{Cancel: cancel, ChunkChan: make(chan int, 1)} - ch := gcContextMap.m[bucketID].ChunkChan - gcContextMap.rw.Unlock() - - go store.gcMgr.gc(ctx, ch, bkt, begin, end, merge) + go store.gcMgr.gc(bkt, begin, end, merge) return } -func (store *HStore) CancelGC(bucketID int) (chunkID int) { +func (store *HStore) CancelGC(bucketID int) (src, dst int) { // prevent same bucket concurrent request gcLock.Lock() defer gcLock.Unlock() // will delete key at goroutine in store/gc.go - gcContextMap.rw.RLock() - bktGCCancelCtx, ok := gcContextMap.m[bucketID] - gcContextMap.rw.RUnlock() + store.gcMgr.mu.RLock() + stat, exists := store.gcMgr.stat[bucketID] + store.gcMgr.mu.RUnlock() - if ok { - bktGCCancelCtx.Cancel() - chunkID = <-bktGCCancelCtx.ChunkChan + if exists { + stat.CancelFlag = true + src, dst = stat.Src, stat.Dst } else { - chunkID = -1 + src, dst = -1, -1 } return } diff --git a/store/hstore_test.go b/store/hstore_test.go index 91e391b..3b8d2af 100644 --- a/store/hstore_test.go +++ b/store/hstore_test.go @@ -2,7 +2,6 @@ package store import ( "bytes" - "context" "flag" "fmt" "os" @@ -284,7 +283,7 @@ func checkDataSize(t *testing.T, ds *dataStore, sizes0 []uint32) { } } -func testGCUpdateSame(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { +func testGCUpdateSame(t *testing.T, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo @@ -313,7 +312,7 @@ func testGCUpdateSame(t *testing.T, ctx context.Context, ch chan<- int, store *H } store.flushdatas(true) bkt := store.buckets[bucketID] - store.gcMgr.gc(ctx, ch, bkt, 0, 0, true) + store.gcMgr.gc(bkt, 0, 0, true) for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 1) @@ -348,7 +347,7 @@ func testGCUpdateSame(t *testing.T, ctx context.Context, ch chan<- int, store *H } } -func testGCNothing(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { +func testGCNothing(t *testing.T, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo @@ -370,7 +369,7 @@ func testGCNothing(t *testing.T, ctx context.Context, ch chan<- int, store *HSto store.flushdatas(true) bkt := store.buckets[bucketID] - store.gcMgr.gc(ctx, ch, bkt, 0, 0, true) + store.gcMgr.gc(bkt, 0, 0, true) for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 0) payload2, pos, err := store.Get(&ki, false) @@ -405,7 +404,7 @@ func testGCNothing(t *testing.T, ctx context.Context, ch chan<- int, store *HSto } } -func testGCAfterRebuildHTree(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { +func testGCAfterRebuildHTree(t *testing.T, store *HStore, bucketID, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -520,7 +519,7 @@ func testGCAfterRebuildHTree(t *testing.T, ctx context.Context, ch chan<- int, s // GC bkt = store.buckets[bucketID] - store.gcMgr.gc(ctx, ch, bkt, 1, 3, true) + store.gcMgr.gc(bkt, 1, 3, true) dir = utils.NewDir() dir.Set("000.data", int64(n)) dir.Set("000.000.idx.s", -1) @@ -568,7 +567,7 @@ func testGCAfterRebuildHTree(t *testing.T, ctx context.Context, ch chan<- int, s // GC begin with 0 bkt = store.buckets[bucketID] - store.gcMgr.gc(ctx, ch, bkt, 0, 1, true) + store.gcMgr.gc(bkt, 0, 1, true) stat := bkt.GCHistory[len(bkt.GCHistory)-1] if stat.Begin > 0 { t.Fatalf("Begin 0") @@ -585,7 +584,7 @@ func testGCAfterRebuildHTree(t *testing.T, ctx context.Context, ch chan<- int, s checkFiles(t, bkt.Home, dir) } -func testGCDeleteSame(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { +func testGCDeleteSame(t *testing.T, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo @@ -625,7 +624,7 @@ func testGCDeleteSame(t *testing.T, ctx context.Context, ch chan<- int, store *H // dir.Set("collision.yaml", -1) checkFiles(t, bkt.Home, dir) - store.gcMgr.gc(ctx, ch, bkt, 0, 0, true) + store.gcMgr.gc(bkt, 0, 0, true) for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 1) payload2, pos, err := store.Get(&ki, false) @@ -681,7 +680,7 @@ func readHStore(t *testing.T, store *HStore, n, v int) { } } -func testGCMulti(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { +func testGCMulti(t *testing.T, store *HStore, bucketID, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -787,7 +786,7 @@ func testGCMulti(t *testing.T, ctx context.Context, ch chan<- int, store *HStore // 002: [n/2 , n) // 004: -1 - store.gcMgr.gc(ctx, ch, bkt, 1, 3, true) + store.gcMgr.gc(bkt, 1, 3, true) stop = true readfunc() @@ -840,7 +839,7 @@ func testGCMulti(t *testing.T, ctx context.Context, ch chan<- int, store *HStore readfunc() } -func testGCToLast(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { +func testGCToLast(t *testing.T, store *HStore, bucketID, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -888,7 +887,7 @@ func testGCToLast(t *testing.T, ctx context.Context, ch chan<- int, store *HStor t.Fatal(err) } store.flushdatas(true) - store.gcMgr.gc(ctx, ch, bkt, 1, 1, true) + store.gcMgr.gc(bkt, 1, 1, true) readfunc := func() { for i := 0; i < N; i++ { payload := gen.gen(&ki, i, 1) @@ -962,14 +961,12 @@ func TestGCDeleteSame(t *testing.T) { testGC(t, testGCDeleteSame, "deleteSame", 100) } -type testGCFunc func(t *testing.T, ctx context.Context, ch chan<- int, hstore *HStore, bucket, numRecPerFile int) +type testGCFunc func(t *testing.T, hstore *HStore, bucket, numRecPerFile int) // numRecPerFile should be even func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) { setupTest(fmt.Sprintf("testGC_%s", name)) - ctx, _ := context.WithCancel(gcContext) - chunkCh := make(chan int, 1) defer clearTest() numbucket := 16 @@ -995,7 +992,7 @@ func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) { t.Fatal(err) } logger.Infof("%#v", Conf) - casefunc(t, ctx, chunkCh, store, bkt, numRecPerFile) + casefunc(t, store, bkt, numRecPerFile) if !cmem.DBRL.IsZero() { t.Fatalf("%#v", cmem.DBRL) @@ -1011,8 +1008,6 @@ func TestGCConcurrency(t *testing.T) { func testGCConcurrency(t *testing.T, numRecPerFile int) { setupTest(fmt.Sprintf("testGC_%s", "testGCConcurrency")) - ctx, _ := context.WithCancel(gcContext) - chunkCh := make(chan int, 1) defer clearTest() numbucket := 16 @@ -1043,7 +1038,7 @@ func testGCConcurrency(t *testing.T, numRecPerFile int) { t.Fatal(err) } logger.Infof("%#v", Conf) - execConcurrencyTest(t, ctx, chunkCh, store, bucketIDs, numRecPerFile) + execConcurrencyTest(t, store, bucketIDs, numRecPerFile) if !cmem.DBRL.IsZero() { t.Fatalf("%#v", cmem.DBRL) @@ -1055,7 +1050,7 @@ func testGCConcurrency(t *testing.T, numRecPerFile int) { } } -func execConcurrencyTest(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketIDs []int, numRecPerFile int) { +func execConcurrencyTest(t *testing.T, store *HStore, bucketIDs []int, numRecPerFile int) { config.MCConf.BodyMax = 512 defer func() { config.MCConf.BodyMax = 50 << 20 @@ -1172,7 +1167,7 @@ func execConcurrencyTest(t *testing.T, ctx context.Context, ch chan<- int, store var wg sync.WaitGroup gcExec := func(wgg *sync.WaitGroup, b *Bucket) { defer wgg.Done() - store.gcMgr.gc(ctx, ch, b, 1, 3, true) + store.gcMgr.gc(b, 1, 3, true) } for _, bucketID := range bucketIDs { wg.Add(1) @@ -1324,7 +1319,7 @@ func checkAllDataWithHints(dir string) error { return nil } -func testGCReserveDelete(t *testing.T, ctx context.Context, ch chan<- int, store *HStore, bucketID, numRecPerFile int) { +func testGCReserveDelete(t *testing.T, store *HStore, bucketID, numRecPerFile int) { gen := newKVGen(16) var ki KeyInfo logger.Infof("test gc all updated in the same file") @@ -1360,7 +1355,7 @@ func testGCReserveDelete(t *testing.T, ctx context.Context, ch chan<- int, store store.flushdatas(true) bkt := store.buckets[bucketID] - store.gcMgr.gc(ctx, ch, bkt, 1, 1, true) + store.gcMgr.gc(bkt, 1, 1, true) gen.gen(&ki, 0, 0) payload2, _, err := store.Get(&ki, false) @@ -1418,8 +1413,6 @@ func TestHStoreCollision(t *testing.T) { func TestChunk256(t *testing.T) { Conf.InitDefault() setupTest("TestChunk256") - ctx, _ := context.WithCancel(gcContext) - ch := make(chan int, 1) defer clearTest() numbucket := 16 @@ -1483,7 +1476,7 @@ func TestChunk256(t *testing.T) { } } bkt := store.buckets[bucketID] - store.gcMgr.gc(ctx, ch, bkt, 0, N-1, true) + store.gcMgr.gc(bkt, 0, N-1, true) } store.Close() } From 702d247344192def3c95e2d1b5e504f0fe268163 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Tue, 23 Apr 2019 18:13:16 +0800 Subject: [PATCH 11/15] fix cancel gc --- store/gc.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/store/gc.go b/store/gc.go index 9fcbff2..59b743e 100644 --- a/store/gc.go +++ b/store/gc.go @@ -1,7 +1,6 @@ package store import ( - "context" "fmt" "sync" "time" @@ -9,11 +8,6 @@ import ( "github.com/douban/gobeansdb/config" ) -type GCCancelCtx struct { - Ctx context.Context - ChunkChan chan int -} - type GCMgr struct { mu sync.RWMutex stat map[int]*GCState // map[bucketID]*GCState @@ -31,7 +25,7 @@ type GCState struct { Src int Dst int - Err error + Err error CancelFlag bool // sum GCFileState From 2c7e832d881a525fadfea2712d452457ae244e75 Mon Sep 17 00:00:00 2001 From: ZHAO Date: Wed, 24 Apr 2019 11:38:42 +0800 Subject: [PATCH 12/15] revert merge release --- gobeansdb/web.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/gobeansdb/web.go b/gobeansdb/web.go index e51c749..4376299 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -336,12 +336,6 @@ func handleGC(w http.ResponseWriter, r *http.Request) { } }() - isQuery := getGCQuery(r) - if isQuery { - result = storage.hstore.GCBuckets() - return - } - bucketID, err = getBucket(r) if err != nil { return From 93bfb3f2414f218e98eccc66c3d5e7b84cdfd122 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Tue, 6 Aug 2019 18:12:29 +0800 Subject: [PATCH 13/15] add Running status for beansdbadmin (#26) --- store/gc.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/store/gc.go b/store/gc.go index 59b743e..408d5b0 100644 --- a/store/gc.go +++ b/store/gc.go @@ -25,6 +25,9 @@ type GCState struct { Src int Dst int + // For beansdbadmin check status. + Running bool + Err error CancelFlag bool // sum @@ -192,11 +195,13 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { mgr.mu.Lock() mgr.stat[bkt.ID] = gc mgr.mu.Unlock() + gc.Running = true gc.BeginTS = time.Now() defer func() { mgr.mu.Lock() delete(mgr.stat, bkt.ID) mgr.mu.Unlock() + gc.Running = false gc.EndTS = time.Now() }() gc.Begin = startChunkID From 9005584ac293f6818ed13f3b2bb4928cd4a92bb0 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Mon, 12 Aug 2019 11:30:34 +0800 Subject: [PATCH 14/15] add more info for query gc status (#27) * add more info for query gc status * show remain chunks for gc --- store/gc.go | 10 ++++----- store/hstore.go | 57 +++++++++++++++++++++++++++++++++---------------- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/store/gc.go b/store/gc.go index 408d5b0..676585c 100644 --- a/store/gc.go +++ b/store/gc.go @@ -10,7 +10,7 @@ import ( type GCMgr struct { mu sync.RWMutex - stat map[int]*GCState // map[bucketID]*GCState + stat map[*Bucket]*GCState // map[bucketID]*GCState } type GCState struct { @@ -143,7 +143,7 @@ func (bkt *Bucket) gcCheckEnd(start, endChunkID, noGCDays int) (end int, err err } if time.Now().Unix()-ts > int64(noGCDays)*86400 { for end = next - 1; end >= start; end-- { - if bkt.datas.chunks[end].size >= 0 { + if bkt.datas.chunks[end].size > 0 { return } } @@ -165,7 +165,7 @@ func (bkt *Bucket) gcCheckStart(startChunkID int) (start int, err error) { start = startChunkID } for ; start < bkt.datas.newHead; start++ { - if bkt.datas.chunks[start].size >= 0 { + if bkt.datas.chunks[start].size > 0 { break } } @@ -193,13 +193,13 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { gc := &bkt.GCHistory[len(bkt.GCHistory)-1] // add gc to mgr's stat map mgr.mu.Lock() - mgr.stat[bkt.ID] = gc + mgr.stat[bkt] = gc mgr.mu.Unlock() gc.Running = true gc.BeginTS = time.Now() defer func() { mgr.mu.Lock() - delete(mgr.stat, bkt.ID) + delete(mgr.stat, bkt) mgr.mu.Unlock() gc.Running = false gc.EndTS = time.Now() diff --git a/store/hstore.go b/store/hstore.go index 553478c..aba3334 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "sync" "sync/atomic" "time" @@ -85,7 +84,7 @@ func NewHStore() (store *HStore, err error) { cmem.DBRL.ResetAll() st := time.Now() store = new(HStore) - store.gcMgr = &GCMgr{stat: make(map[int]*GCState)} + store.gcMgr = &GCMgr{stat: make(map[*Bucket]*GCState)} store.buckets = make([]*Bucket, Conf.NumBucket) for i := 0; i < Conf.NumBucket; i++ { store.buckets[i] = &Bucket{} @@ -246,17 +245,41 @@ func (store *HStore) ListDir(ki *KeyInfo) ([]byte, error) { return store.ListUpper(ki) } -func (store *HStore) GCBuckets() []string { +func (store *HStore) GCBuckets() map[string][]string { + + fetchRemain := func(current, end int, b *Bucket) int { + var cnt int + for i := current; i <= end; i++ { + if b.datas.chunks[i].size > 0 { + cnt++ + } + } + return cnt + } store.gcMgr.mu.Lock() - result := make([]string, 0, len(store.gcMgr.stat)) - for k := range store.gcMgr.stat { - result = append(result, strconv.FormatInt(int64(k), 16)) + result := make(map[string][]string) + for bkt, st := range store.gcMgr.stat { + disk, _ := utils.DiskUsage(bkt.Home) + remain := fetchRemain(st.Src, st.End, bkt) + gcResult := fmt.Sprintf("bkt: %02x, start -> %d, end -> %d, remain -> %d", bkt.ID, st.Begin, st.End, remain) + result[disk.Root] = append(result[disk.Root], gcResult) } store.gcMgr.mu.Unlock() return result } +func (store *HStore) getBucket(bucketID int) *Bucket { + if bucketID < 0 || bucketID >= len(store.buckets) { + return nil + } + bkt := store.buckets[bucketID] + if bkt.State != BUCKET_STAT_READY { + return nil + } + return bkt +} + func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, pretend bool) (begin, end int, err error) { if bucketID >= Conf.NumBucket { err = fmt.Errorf("bad bucket id: %d", bucketID) @@ -264,14 +287,14 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, } bkt := store.buckets[bucketID] if bkt.State != BUCKET_STAT_READY { - err = fmt.Errorf("no datay for bucket id: %d", bucketID) + err = fmt.Errorf("no data for bucket id: %d", bucketID) return } checkGC := func() error { store.gcMgr.mu.RLock() defer store.gcMgr.mu.RUnlock() - if _, exists := store.gcMgr.stat[bucketID]; exists { + if _, exists := store.gcMgr.stat[bkt]; exists { err := fmt.Errorf("gc on bkt: %d already running", bucketID) return err } @@ -297,10 +320,15 @@ func (store *HStore) CancelGC(bucketID int) (src, dst int) { // prevent same bucket concurrent request gcLock.Lock() defer gcLock.Unlock() - + bkt := store.getBucket(bucketID) + if bkt == nil { + logger.Warnf("get bucket: %02x error", bucketID) + src, dst = -1, -1 + return + } // will delete key at goroutine in store/gc.go store.gcMgr.mu.RLock() - stat, exists := store.gcMgr.stat[bucketID] + stat, exists := store.gcMgr.stat[bkt] store.gcMgr.mu.RUnlock() if exists { @@ -324,14 +352,7 @@ func (store *HStore) IsGCRunning() bool { } func (store *HStore) GetBucketInfo(bucketID int) *BucketInfo { - if bucketID < 0 || bucketID >= len(store.buckets) { - return nil - } - bkt := store.buckets[bucketID] - if bkt.State != BUCKET_STAT_READY { - return nil - } - return bkt.getInfo() + return store.getBucket(bucketID).getInfo() } func (store *HStore) Get(ki *KeyInfo, memOnly bool) (payload *Payload, pos Position, err error) { From f1b79ea0d942d29eb7e0f471ee88ef553bd145a2 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Sat, 12 Oct 2019 16:34:46 +0800 Subject: [PATCH 15/15] update travis ci (#29) --- .travis.yml | 2 ++ go.mod | 3 +++ go.sum | 8 ++++++++ utils/dist_test.go | 6 ++++-- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2630334..f8503bd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,8 @@ os: go: - "1.11.x" + - "1.12.x" + - "1.13.x" env: - GO111MODULE=on diff --git a/go.mod b/go.mod index a2e161b..a4159e3 100644 --- a/go.mod +++ b/go.mod @@ -3,5 +3,8 @@ module github.com/douban/gobeansdb require ( github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 + golang.org/x/tools v0.0.0-20191011211836-4c025a95b26e // indirect gopkg.in/yaml.v2 v2.2.1 ) + +go 1.13 diff --git a/go.sum b/go.sum index a9a12c9..dee75fa 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,14 @@ github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9L github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20191011211836-4c025a95b26e h1:1o2bDs9pCd2xFhdwqJTrCIswAeEsn4h/PCNelWpfcsI= +golang.org/x/tools v0.0.0-20191011211836-4c025a95b26e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= diff --git a/utils/dist_test.go b/utils/dist_test.go index 45e683c..e5a2175 100644 --- a/utils/dist_test.go +++ b/utils/dist_test.go @@ -1,7 +1,9 @@ package utils -import "testing" -import "os" +import ( + "os" + "testing" +) func TestDir(t *testing.T) { p := "testdir"