diff --git a/.travis.yml b/.travis.yml index 2630334..f8503bd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,8 @@ os: go: - "1.11.x" + - "1.12.x" + - "1.13.x" env: - GO111MODULE=on diff --git a/go.mod b/go.mod index a2e161b..a4159e3 100644 --- a/go.mod +++ b/go.mod @@ -3,5 +3,8 @@ module github.com/douban/gobeansdb require ( github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 + golang.org/x/tools v0.0.0-20191011211836-4c025a95b26e // indirect gopkg.in/yaml.v2 v2.2.1 ) + +go 1.13 diff --git a/go.sum b/go.sum index a9a12c9..dee75fa 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,14 @@ github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9L github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20191011211836-4c025a95b26e h1:1o2bDs9pCd2xFhdwqJTrCIswAeEsn4h/PCNelWpfcsI= +golang.org/x/tools v0.0.0-20191011211836-4c025a95b26e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= diff --git a/store/gc.go b/store/gc.go index 59b743e..676585c 100644 --- a/store/gc.go +++ b/store/gc.go @@ -10,7 +10,7 @@ import ( type GCMgr struct { mu sync.RWMutex - stat map[int]*GCState // map[bucketID]*GCState + stat map[*Bucket]*GCState // map[bucketID]*GCState } type GCState struct { @@ -25,6 +25,9 @@ type GCState struct { Src int Dst int + // For beansdbadmin check status. + Running bool + Err error CancelFlag bool // sum @@ -140,7 +143,7 @@ func (bkt *Bucket) gcCheckEnd(start, endChunkID, noGCDays int) (end int, err err } if time.Now().Unix()-ts > int64(noGCDays)*86400 { for end = next - 1; end >= start; end-- { - if bkt.datas.chunks[end].size >= 0 { + if bkt.datas.chunks[end].size > 0 { return } } @@ -162,7 +165,7 @@ func (bkt *Bucket) gcCheckStart(startChunkID int) (start int, err error) { start = startChunkID } for ; start < bkt.datas.newHead; start++ { - if bkt.datas.chunks[start].size >= 0 { + if bkt.datas.chunks[start].size > 0 { break } } @@ -190,13 +193,15 @@ func (mgr *GCMgr) gc(bkt *Bucket, startChunkID, endChunkID int, merge bool) { gc := &bkt.GCHistory[len(bkt.GCHistory)-1] // add gc to mgr's stat map mgr.mu.Lock() - mgr.stat[bkt.ID] = gc + mgr.stat[bkt] = gc mgr.mu.Unlock() + gc.Running = true gc.BeginTS = time.Now() defer func() { mgr.mu.Lock() - delete(mgr.stat, bkt.ID) + delete(mgr.stat, bkt) mgr.mu.Unlock() + gc.Running = false gc.EndTS = time.Now() }() gc.Begin = startChunkID diff --git a/store/hstore.go b/store/hstore.go index 553478c..aba3334 100644 --- a/store/hstore.go +++ b/store/hstore.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "sync" "sync/atomic" "time" @@ -85,7 +84,7 @@ func NewHStore() (store *HStore, err error) { cmem.DBRL.ResetAll() st := time.Now() store = new(HStore) - store.gcMgr = &GCMgr{stat: make(map[int]*GCState)} + store.gcMgr = &GCMgr{stat: make(map[*Bucket]*GCState)} store.buckets = make([]*Bucket, Conf.NumBucket) for i := 0; i < Conf.NumBucket; i++ { store.buckets[i] = &Bucket{} @@ -246,17 +245,41 @@ func (store *HStore) ListDir(ki *KeyInfo) ([]byte, error) { return store.ListUpper(ki) } -func (store *HStore) GCBuckets() []string { +func (store *HStore) GCBuckets() map[string][]string { + + fetchRemain := func(current, end int, b *Bucket) int { + var cnt int + for i := current; i <= end; i++ { + if b.datas.chunks[i].size > 0 { + cnt++ + } + } + return cnt + } store.gcMgr.mu.Lock() - result := make([]string, 0, len(store.gcMgr.stat)) - for k := range store.gcMgr.stat { - result = append(result, strconv.FormatInt(int64(k), 16)) + result := make(map[string][]string) + for bkt, st := range store.gcMgr.stat { + disk, _ := utils.DiskUsage(bkt.Home) + remain := fetchRemain(st.Src, st.End, bkt) + gcResult := fmt.Sprintf("bkt: %02x, start -> %d, end -> %d, remain -> %d", bkt.ID, st.Begin, st.End, remain) + result[disk.Root] = append(result[disk.Root], gcResult) } store.gcMgr.mu.Unlock() return result } +func (store *HStore) getBucket(bucketID int) *Bucket { + if bucketID < 0 || bucketID >= len(store.buckets) { + return nil + } + bkt := store.buckets[bucketID] + if bkt.State != BUCKET_STAT_READY { + return nil + } + return bkt +} + func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, pretend bool) (begin, end int, err error) { if bucketID >= Conf.NumBucket { err = fmt.Errorf("bad bucket id: %d", bucketID) @@ -264,14 +287,14 @@ func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, } bkt := store.buckets[bucketID] if bkt.State != BUCKET_STAT_READY { - err = fmt.Errorf("no datay for bucket id: %d", bucketID) + err = fmt.Errorf("no data for bucket id: %d", bucketID) return } checkGC := func() error { store.gcMgr.mu.RLock() defer store.gcMgr.mu.RUnlock() - if _, exists := store.gcMgr.stat[bucketID]; exists { + if _, exists := store.gcMgr.stat[bkt]; exists { err := fmt.Errorf("gc on bkt: %d already running", bucketID) return err } @@ -297,10 +320,15 @@ func (store *HStore) CancelGC(bucketID int) (src, dst int) { // prevent same bucket concurrent request gcLock.Lock() defer gcLock.Unlock() - + bkt := store.getBucket(bucketID) + if bkt == nil { + logger.Warnf("get bucket: %02x error", bucketID) + src, dst = -1, -1 + return + } // will delete key at goroutine in store/gc.go store.gcMgr.mu.RLock() - stat, exists := store.gcMgr.stat[bucketID] + stat, exists := store.gcMgr.stat[bkt] store.gcMgr.mu.RUnlock() if exists { @@ -324,14 +352,7 @@ func (store *HStore) IsGCRunning() bool { } func (store *HStore) GetBucketInfo(bucketID int) *BucketInfo { - if bucketID < 0 || bucketID >= len(store.buckets) { - return nil - } - bkt := store.buckets[bucketID] - if bkt.State != BUCKET_STAT_READY { - return nil - } - return bkt.getInfo() + return store.getBucket(bucketID).getInfo() } func (store *HStore) Get(ki *KeyInfo, memOnly bool) (payload *Payload, pos Position, err error) { diff --git a/utils/dist_test.go b/utils/dist_test.go index 45e683c..e5a2175 100644 --- a/utils/dist_test.go +++ b/utils/dist_test.go @@ -1,7 +1,9 @@ package utils -import "testing" -import "os" +import ( + "os" + "testing" +) func TestDir(t *testing.T) { p := "testdir"