diff --git a/gobeansdb/web.go b/gobeansdb/web.go index 2ec5524..4376299 100644 --- a/gobeansdb/web.go +++ b/gobeansdb/web.go @@ -10,7 +10,7 @@ import ( "runtime/debug" "strconv" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/douban/gobeansdb/cmem" "github.com/douban/gobeansdb/config" @@ -55,7 +55,6 @@ func init() { http.HandleFunc("/statgetset", handleStatGetSet) http.HandleFunc("/freememory", handleFreeMemory) - } func initWeb() { @@ -314,6 +313,15 @@ func handleGC(w http.ResponseWriter, r *http.Request) { var err error var bucketID int64 var pretend bool + + isQuery := getGCQuery(r) + if isQuery { + gcResult := storage.hstore.GCBuckets() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(gcResult) + return + } + defer func() { if err != nil { e := fmt.Sprintf("

err : %s

", err.Error()) @@ -328,17 +336,16 @@ func handleGC(w http.ResponseWriter, r *http.Request) { } }() - isQuery := getGCQuery(r) - if isQuery { - result = storage.hstore.GCBuckets() + bucketID, err = getBucket(r) + if err != nil { return } + bktID := int(bucketID) - bucketID, err = getBucket(r) + err = r.ParseForm() if err != nil { return } - r.ParseForm() start, err := getFormValueInt(r, "start", -1) if err != nil { return @@ -353,16 +360,27 @@ func handleGC(w http.ResponseWriter, r *http.Request) { } s := r.FormValue("run") - pretend = (s != "true") + pretend = s != "true" s = r.FormValue("merge") - merge := (s == "true") + merge := s == "true" + + s = r.FormValue("cancel") + gcCancel := s == "true" - start, end, err = storage.hstore.GC(int(bucketID), start, end, noGCDays, merge, pretend) - if err == nil { + if gcCancel { + src, dst := storage.hstore.CancelGC(bktID) + if src == -1 { + result = fmt.Sprintf("bucket %d is not gcing", bktID) + } else { + result = fmt.Sprintf("cancel gc on bucket %d, src: %d -> dst: %d", bktID, src, dst) + } + } else { + start, end, err = storage.hstore.GC(bktID, start, end, noGCDays, merge, pretend) result = fmt.Sprintf("

bucket %d, start %d, end %d, merge %v, pretend %v

", bucketID, start, end, merge, pretend) } + return } func handleDU(w http.ResponseWriter, r *http.Request) { diff --git a/store/data.go b/store/data.go index d5a6de7..78a348f 100644 --- a/store/data.go +++ b/store/data.go @@ -132,7 +132,9 @@ func (ds *dataStore) flush(chunk int, force bool) error { filessize, w.offset, ds.genPath(chunk), &ds.chunks[chunk]) } nflushed, err := ds.chunks[chunk].flush(w, false) + ds.Lock() ds.wbufSize -= nflushed + ds.Unlock() w.Close() return nil diff --git a/store/gc.go b/store/gc.go index 58b9a3a..59b743e 100644 --- a/store/gc.go +++ b/store/gc.go @@ -25,7 +25,8 @@ type GCState struct { Src int Dst int - Err error + Err error + CancelFlag bool // sum GCFileState } @@ -210,10 +211,20 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { defer mgr.AfterBucket(bkt) gc.Dst = startChunkID - for i := 0; i < startChunkID; i++ { + // try to find the nearest chunk that small than start chunk + for i := startChunkID - 1; i >= 0; i-- { sz := bkt.datas.chunks[i].size - if sz > 0 && (int64(sz) < Conf.DataFileMax-config.MCConf.BodyMax) { - gc.Dst = i + if sz > 0 { + if int64(sz) < Conf.DataFileMax-config.MCConf.BodyMax { + // previous one available and not full, use it. + gc.Dst = i + break + } else { + if i < startChunkID-1 { // not previous one + gc.Dst = i + 1 + } + break + } } } @@ -230,6 +241,10 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { }() for gc.Src = gc.Begin; gc.Src <= gc.End; gc.Src++ { + if gc.CancelFlag { + logger.Infof("GC canceled: src %d dst %d", gc.Src, gc.Dst) + return + } if bkt.datas.chunks[gc.Src].size <= 0 { logger.Infof("skip empty chunk %d", gc.Src) continue diff --git a/store/hstore.go b/store/hstore.go index dac7999..553478c 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -20,6 +19,7 @@ import ( var ( logger = loghub.ErrorLogger mergeChan chan int + gcLock sync.Mutex ) type HStore struct { @@ -246,15 +246,15 @@ func (store *HStore) ListDir(ki *KeyInfo) ([]byte, error) { return store.ListUpper(ki) } -func (store *HStore) GCBuckets() string { - var result strings.Builder +func (store *HStore) GCBuckets() []string { + store.gcMgr.mu.Lock() + result := make([]string, 0, len(store.gcMgr.stat)) for k := range store.gcMgr.stat { - result.WriteString(strconv.FormatInt(int64(k), 16)) - result.WriteString(",") + result = append(result, strconv.FormatInt(int64(k), 16)) } store.gcMgr.mu.Unlock() - return strings.TrimSuffix(result.String(), ",") + return result } func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, pretend bool) (begin, end int, err error) { @@ -288,10 +288,30 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, if pretend { return } + go store.gcMgr.gc(bkt, begin, end, merge) return } +func (store *HStore) CancelGC(bucketID int) (src, dst int) { + // prevent same bucket concurrent request + gcLock.Lock() + defer gcLock.Unlock() + + // will delete key at goroutine in store/gc.go + store.gcMgr.mu.RLock() + stat, exists := store.gcMgr.stat[bucketID] + store.gcMgr.mu.RUnlock() + + if exists { + stat.CancelFlag = true + src, dst = stat.Src, stat.Dst + } else { + src, dst = -1, -1 + } + return +} + func (store *HStore) IsGCRunning() bool { var result bool store.gcMgr.mu.RLock() diff --git a/store/hstore_test.go b/store/hstore_test.go index 63bbcf8..3b8d2af 100644 --- a/store/hstore_test.go +++ b/store/hstore_test.go @@ -1001,6 +1001,7 @@ func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) { store.Close() checkAllDataWithHints(bucketDir) } + func TestGCConcurrency(t *testing.T) { testGCConcurrency(t, 10000) }