Skip to content

Commit

Permalink
release gc cancel (#25)
Browse files Browse the repository at this point in the history
release gc cancel
  • Loading branch information
ariesdevil authored Apr 24, 2019
1 parent c523d09 commit 06d90e2
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 21 deletions.
40 changes: 29 additions & 11 deletions gobeansdb/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"runtime/debug"
"strconv"

yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"

"github.com/douban/gobeansdb/cmem"
"github.com/douban/gobeansdb/config"
Expand Down Expand Up @@ -55,7 +55,6 @@ func init() {

http.HandleFunc("/statgetset", handleStatGetSet)
http.HandleFunc("/freememory", handleFreeMemory)

}

func initWeb() {
Expand Down Expand Up @@ -314,6 +313,15 @@ func handleGC(w http.ResponseWriter, r *http.Request) {
var err error
var bucketID int64
var pretend bool

isQuery := getGCQuery(r)
if isQuery {
gcResult := storage.hstore.GCBuckets()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(gcResult)
return
}

defer func() {
if err != nil {
e := fmt.Sprintf("<p> err : %s </p>", err.Error())
Expand All @@ -328,17 +336,16 @@ func handleGC(w http.ResponseWriter, r *http.Request) {
}
}()

isQuery := getGCQuery(r)
if isQuery {
result = storage.hstore.GCBuckets()
bucketID, err = getBucket(r)
if err != nil {
return
}
bktID := int(bucketID)

bucketID, err = getBucket(r)
err = r.ParseForm()
if err != nil {
return
}
r.ParseForm()
start, err := getFormValueInt(r, "start", -1)
if err != nil {
return
Expand All @@ -353,16 +360,27 @@ func handleGC(w http.ResponseWriter, r *http.Request) {
}

s := r.FormValue("run")
pretend = (s != "true")
pretend = s != "true"

s = r.FormValue("merge")
merge := (s == "true")
merge := s == "true"

s = r.FormValue("cancel")
gcCancel := s == "true"

start, end, err = storage.hstore.GC(int(bucketID), start, end, noGCDays, merge, pretend)
if err == nil {
if gcCancel {
src, dst := storage.hstore.CancelGC(bktID)
if src == -1 {
result = fmt.Sprintf("bucket %d is not gcing", bktID)
} else {
result = fmt.Sprintf("cancel gc on bucket %d, src: %d -> dst: %d", bktID, src, dst)
}
} else {
start, end, err = storage.hstore.GC(bktID, start, end, noGCDays, merge, pretend)
result = fmt.Sprintf("<p/> bucket %d, start %d, end %d, merge %v, pretend %v <p/>",
bucketID, start, end, merge, pretend)
}
return
}

func handleDU(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 2 additions & 0 deletions store/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (ds *dataStore) flush(chunk int, force bool) error {
filessize, w.offset, ds.genPath(chunk), &ds.chunks[chunk])
}
nflushed, err := ds.chunks[chunk].flush(w, false)
ds.Lock()
ds.wbufSize -= nflushed
ds.Unlock()
w.Close()

return nil
Expand Down
23 changes: 19 additions & 4 deletions store/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type GCState struct {
Src int
Dst int

Err error
Err error
CancelFlag bool
// sum
GCFileState
}
Expand Down Expand Up @@ -210,10 +211,20 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) {
defer mgr.AfterBucket(bkt)

gc.Dst = startChunkID
for i := 0; i < startChunkID; i++ {
// try to find the nearest chunk that small than start chunk
for i := startChunkID - 1; i >= 0; i-- {
sz := bkt.datas.chunks[i].size
if sz > 0 && (int64(sz) < Conf.DataFileMax-config.MCConf.BodyMax) {
gc.Dst = i
if sz > 0 {
if int64(sz) < Conf.DataFileMax-config.MCConf.BodyMax {
// previous one available and not full, use it.
gc.Dst = i
break
} else {
if i < startChunkID-1 { // not previous one
gc.Dst = i + 1
}
break
}
}
}

Expand All @@ -230,6 +241,10 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) {
}()

for gc.Src = gc.Begin; gc.Src <= gc.End; gc.Src++ {
if gc.CancelFlag {
logger.Infof("GC canceled: src %d dst %d", gc.Src, gc.Dst)
return
}
if bkt.datas.chunks[gc.Src].size <= 0 {
logger.Infof("skip empty chunk %d", gc.Src)
continue
Expand Down
32 changes: 26 additions & 6 deletions store/hstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -20,6 +19,7 @@ import (
var (
logger = loghub.ErrorLogger
mergeChan chan int
gcLock sync.Mutex
)

type HStore struct {
Expand Down Expand Up @@ -246,15 +246,15 @@ func (store *HStore) ListDir(ki *KeyInfo) ([]byte, error) {
return store.ListUpper(ki)
}

func (store *HStore) GCBuckets() string {
var result strings.Builder
func (store *HStore) GCBuckets() []string {

store.gcMgr.mu.Lock()
result := make([]string, 0, len(store.gcMgr.stat))
for k := range store.gcMgr.stat {
result.WriteString(strconv.FormatInt(int64(k), 16))
result.WriteString(",")
result = append(result, strconv.FormatInt(int64(k), 16))
}
store.gcMgr.mu.Unlock()
return strings.TrimSuffix(result.String(), ",")
return result
}

func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, pretend bool) (begin, end int, err error) {
Expand Down Expand Up @@ -288,10 +288,30 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge,
if pretend {
return
}

go store.gcMgr.gc(bkt, begin, end, merge)
return
}

func (store *HStore) CancelGC(bucketID int) (src, dst int) {
// prevent same bucket concurrent request
gcLock.Lock()
defer gcLock.Unlock()

// will delete key at goroutine in store/gc.go
store.gcMgr.mu.RLock()
stat, exists := store.gcMgr.stat[bucketID]
store.gcMgr.mu.RUnlock()

if exists {
stat.CancelFlag = true
src, dst = stat.Src, stat.Dst
} else {
src, dst = -1, -1
}
return
}

func (store *HStore) IsGCRunning() bool {
var result bool
store.gcMgr.mu.RLock()
Expand Down
1 change: 1 addition & 0 deletions store/hstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,7 @@ func testGC(t *testing.T, casefunc testGCFunc, name string, numRecPerFile int) {
store.Close()
checkAllDataWithHints(bucketDir)
}

func TestGCConcurrency(t *testing.T) {
testGCConcurrency(t, 10000)
}
Expand Down

0 comments on commit 06d90e2

Please sign in to comment.