diff --git a/server/config/config.go b/server/config/config.go index 5f3a0d8e9f5..0c73797912c 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -117,6 +117,7 @@ type ServerConfig struct { AutoCompactionRetention time.Duration AutoCompactionMode string + AutoCompactionInterval time.Duration CompactionBatchLimit int CompactionSleepInterval time.Duration QuotaBackendBytes int64 diff --git a/server/embed/config.go b/server/embed/config.go index b10c5dc52c6..6cd3733165e 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -272,6 +272,9 @@ type Config struct { // If no time unit is provided and compaction mode is 'periodic', // the unit defaults to hour. For example, '5' translates into 5-hour. AutoCompactionRetention string `json:"auto-compaction-retention"` + // AutoCompactionInterval is the delay between compaction runs. + // If no interval is specified 'periodic' defaults to retention, revision defaults to 5 minutes + AutoCompactionInterval string `json:"auto-compaction-interval"` // GRPCKeepAliveMinTime is the minimum interval that a client should // wait before pinging server. When client pings "too fast", server @@ -724,6 +727,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.") fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.") + fs.StringVar(&cfg.AutoCompactionInterval, "auto-compaction-interval", "", "Auto compaction interval for mvcc key value store. Default is based on mode selected.") // pprof profiler via HTTP fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"") diff --git a/server/embed/config_test.go b/server/embed/config_test.go index 73769408cd9..430409aa805 100644 --- a/server/embed/config_test.go +++ b/server/embed/config_test.go @@ -533,6 +533,35 @@ func TestAutoCompactionModeParse(t *testing.T) { } } +func TestAutoCompactionIntervalParse(t *testing.T) { + tests := []struct { + interval string + werr bool + wdur time.Duration + }{ + {"", false, 0}, + {"1", true, 0}, + {"1h", false, time.Hour}, + {"1s", false, time.Second}, + {"a", true, 0}, + {"-1", true, 0}, + } + + hasErr := func(err error) bool { + return err != nil + } + + for i, tt := range tests { + dur, err := parseCompactionInterval(tt.interval) + if hasErr(err) != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if dur != tt.wdur { + t.Errorf("#%d: duration = %s, want %s", i, dur, tt.wdur) + } + } +} + func TestPeerURLsMapAndTokenFromSRV(t *testing.T) { defer func() { getCluster = srv.GetCluster }() diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 80cc1cc3bf2..bf31ed52f3d 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -165,6 +165,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { return e, err } + autoCompactionInterval, err := parseCompactionInterval(cfg.AutoCompactionInterval) + if err != nil { + return e, err + } + backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType) srvcfg := config.ServerConfig{ @@ -189,6 +194,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { InitialElectionTickAdvance: cfg.InitialElectionTickAdvance, AutoCompactionRetention: autoCompactionRetention, AutoCompactionMode: cfg.AutoCompactionMode, + AutoCompactionInterval: autoCompactionInterval, QuotaBackendBytes: cfg.QuotaBackendBytes, BackendBatchLimit: cfg.BackendBatchLimit, BackendFreelistType: backendFreelistType, @@ -892,6 +898,13 @@ func (e *Etcd) GetLogger() *zap.Logger { return l } +func parseCompactionInterval(interval string) (ret time.Duration, err error) { + if interval == "" { + return ret, nil + } + return time.ParseDuration(interval) +} + func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) { h, err := strconv.Atoi(retention) if err == nil && h >= 0 { diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 52b8d3f92e3..609d9f19e1b 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -168,6 +168,8 @@ Clustering: Auto compaction retention length. 0 means disable auto compaction. --auto-compaction-mode 'periodic' Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention. + --auto-compaction-interval '' + Auto compaction interval. Empty means use default based on mode selected. --v2-deprecation '` + string(cconfig.V2DeprDefault) + `' Phase of v2store deprecation. Allows to opt-in for higher compatibility mode. Supported values: diff --git a/server/etcdserver/api/v3compactor/compactor.go b/server/etcdserver/api/v3compactor/compactor.go index f916e71141b..374cd44f743 100644 --- a/server/etcdserver/api/v3compactor/compactor.go +++ b/server/etcdserver/api/v3compactor/compactor.go @@ -56,6 +56,7 @@ func New( lg *zap.Logger, mode string, retention time.Duration, + interval time.Duration, rg RevGetter, c Compactable, ) (Compactor, error) { @@ -64,9 +65,9 @@ func New( } switch mode { case ModePeriodic: - return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil + return newPeriodic(lg, clockwork.NewRealClock(), retention, interval, rg, c), nil case ModeRevision: - return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil + return newRevision(lg, clockwork.NewRealClock(), int64(retention), interval, rg, c), nil default: return nil, fmt.Errorf("unsupported compaction mode %s", mode) } diff --git a/server/etcdserver/api/v3compactor/periodic.go b/server/etcdserver/api/v3compactor/periodic.go index 98fbc381bb8..f84dddc31ae 100644 --- a/server/etcdserver/api/v3compactor/periodic.go +++ b/server/etcdserver/api/v3compactor/periodic.go @@ -29,9 +29,10 @@ import ( // Periodic compacts the log by purging revisions older than // the configured retention time. type Periodic struct { - lg *zap.Logger - clock clockwork.Clock - period time.Duration + lg *zap.Logger + clock clockwork.Clock + period time.Duration + interval time.Duration rg RevGetter c Compactable @@ -47,13 +48,14 @@ type Periodic struct { // newPeriodic creates a new instance of Periodic compactor that purges // the log older than h Duration. -func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { +func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, interval time.Duration, rg RevGetter, c Compactable) *Periodic { pc := &Periodic{ - lg: lg, - clock: clock, - period: h, - rg: rg, - c: c, + lg: lg, + clock: clock, + period: h, + interval: interval, + rg: rg, + c: c, } // revs won't be longer than the retentions. pc.revs = make([]int64, 0, pc.getRetentions()) @@ -161,11 +163,15 @@ func (pc *Periodic) Run() { }() } +// if static interval is provided, compact every x duration. // if given compaction period x is <1-hour, compact every x duration. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) // if given compaction period x is >1-hour, compact every hour. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) func (pc *Periodic) getCompactInterval() time.Duration { + if pc.interval != 0 { + return pc.interval + } itv := pc.period if itv > time.Hour { itv = time.Hour diff --git a/server/etcdserver/api/v3compactor/periodic_test.go b/server/etcdserver/api/v3compactor/periodic_test.go index 5053482a807..88044d0a4d3 100644 --- a/server/etcdserver/api/v3compactor/periodic_test.go +++ b/server/etcdserver/api/v3compactor/periodic_test.go @@ -30,12 +30,13 @@ import ( func TestPeriodicHourly(t *testing.T) { retentionHours := 2 retentionDuration := time.Duration(retentionHours) * time.Hour + intervalDuration := time.Duration(0) fc := clockwork.NewFakeClock() // TODO: Do not depand or real time (Recorder.Wait) in unit tests. rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -82,11 +83,12 @@ func TestPeriodicHourly(t *testing.T) { func TestPeriodicMinutes(t *testing.T) { retentionMinutes := 5 retentionDuration := time.Duration(retentionMinutes) * time.Minute + intervalDuration := time.Duration(0) fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() defer tb.Stop() @@ -129,12 +131,64 @@ func TestPeriodicMinutes(t *testing.T) { } } +func TestPeriodicMinutesWithInterval(t *testing.T) { + retentionMinutes := 10 + retentionDuration := time.Duration(retentionMinutes) * time.Minute + intervalDuration := 2 * time.Minute + + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) + + tb.Run() + defer tb.Stop() + + // compaction doesn't happen til 10 minutes elapse + for i := 0; i < retentionMinutes; i++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } + + // very first compaction + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(1) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + for i := 0; i < 10; i++ { + // advance 20 minutes, one revision for each minute + for j := 0; j < 20; j++ { + rg.Wait(1) + fc.Advance(1 * time.Minute) + } + + // compact + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + + // the expected revision is the current revision minus the retention duration + // since we made a revision every minute + expectedRevision := rg.rev - int64(retentionDuration.Minutes()) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + } +} + func TestPeriodicPause(t *testing.T) { fc := clockwork.NewFakeClock() retentionDuration := time.Hour + intervalDuration := time.Duration(0) rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() tb.Pause() @@ -177,11 +231,12 @@ func TestPeriodicPause(t *testing.T) { func TestPeriodicSkipRevNotChange(t *testing.T) { retentionMinutes := 5 retentionDuration := time.Duration(retentionMinutes) * time.Minute + intervalDuration := time.Duration(0) fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) + tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable) tb.Run() defer tb.Stop() diff --git a/server/etcdserver/api/v3compactor/revision.go b/server/etcdserver/api/v3compactor/revision.go index 326ac211d0b..b225de5ed1a 100644 --- a/server/etcdserver/api/v3compactor/revision.go +++ b/server/etcdserver/api/v3compactor/revision.go @@ -33,6 +33,7 @@ type Revision struct { clock clockwork.Clock retention int64 + interval time.Duration rg RevGetter c Compactable @@ -46,11 +47,16 @@ type Revision struct { // newRevision creates a new instance of Revisonal compactor that purges // the log older than retention revisions from the current revision. -func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision { +func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, interval time.Duration, rg RevGetter, c Compactable) *Revision { + // default revision interval to 5 minutes + if interval == 0 { + interval = time.Minute * 5 + } rc := &Revision{ lg: lg, clock: clock, retention: retention, + interval: interval, rg: rg, c: c, } @@ -58,8 +64,6 @@ func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevG return rc } -const revInterval = 5 * time.Minute - // Run runs revision-based compactor. func (rc *Revision) Run() { prev := int64(0) @@ -68,7 +72,7 @@ func (rc *Revision) Run() { select { case <-rc.ctx.Done(): return - case <-rc.clock.After(revInterval): + case <-rc.clock.After(rc.interval): rc.mu.Lock() p := rc.paused rc.mu.Unlock() @@ -102,7 +106,7 @@ func (rc *Revision) Run() { "failed auto revision compaction", zap.Int64("revision", rev), zap.Int64("revision-compaction-retention", rc.retention), - zap.Duration("retry-interval", revInterval), + zap.Duration("retry-interval", rc.interval), zap.Error(err), ) } diff --git a/server/etcdserver/api/v3compactor/revision_test.go b/server/etcdserver/api/v3compactor/revision_test.go index 54e25f2b88c..981525deaeb 100644 --- a/server/etcdserver/api/v3compactor/revision_test.go +++ b/server/etcdserver/api/v3compactor/revision_test.go @@ -26,22 +26,24 @@ import ( "go.etcd.io/etcd/client/pkg/v3/testutil" ) +const testRevInterval = 5 * time.Minute + func TestRevision(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} - tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable) + tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable) tb.Run() defer tb.Stop() - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) // nothing happens rg.SetRev(99) // will be 100 expectedRevision := int64(90) - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) a, err := compactable.Wait(1) if err != nil { @@ -58,7 +60,7 @@ func TestRevision(t *testing.T) { rg.SetRev(199) // will be 200 expectedRevision = int64(190) - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) a, err = compactable.Wait(1) if err != nil { @@ -73,15 +75,15 @@ func TestRevisionPause(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100 compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newRevision(zaptest.NewLogger(t), fc, 10, rg, compactable) + tb := newRevision(zaptest.NewLogger(t), fc, 10, testRevInterval, rg, compactable) tb.Run() tb.Pause() // tb will collect 3 hours of revisions but not compact since paused - n := int(time.Hour / revInterval) + n := int(time.Hour / testRevInterval) for i := 0; i < 3*n; i++ { - fc.Advance(revInterval) + fc.Advance(testRevInterval) } // tb ends up waiting for the clock @@ -95,7 +97,7 @@ func TestRevisionPause(t *testing.T) { tb.Resume() // unblock clock, will kick off a compaction at hour 3:05 - fc.Advance(revInterval) + fc.Advance(testRevInterval) rg.Wait(1) a, err := compactable.Wait(1) if err != nil { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 0600a31b896..b2d06c65d0c 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -383,7 +383,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() if num := cfg.AutoCompactionRetention; num != 0 { - srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) + srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, cfg.AutoCompactionInterval, srv.kv, srv) if err != nil { return nil, err }