Skip to content

Commit

Permalink
Merge pull request #18514 from JalinWang/backport/release-3.5
Browse files Browse the repository at this point in the history
[3.5] Introduce the CompactionSleepInterval flag
  • Loading branch information
ahrtr authored Sep 2, 2024
2 parents 82994d1 + 0263597 commit 1d4372a
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 10 deletions.
1 change: 1 addition & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type ServerConfig struct {
AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64
MaxTxnOps uint

Expand Down
8 changes: 5 additions & 3 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,11 @@ type Config struct {
// Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func newConfig() *config {
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ Experimental feature:
Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.
--experimental-compaction-batch-limit 1000
ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
--experimental-compaction-sleep-interval '10ms'
ExperimentalCompactionSleepInterval sets the sleep interval between each compaction batch.
--experimental-peer-skip-client-san-verification 'false'
Skip verification of SAN field in client certificate for peer connections.
--experimental-watch-progress-notify-interval '10m'
Expand Down
8 changes: 7 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})

mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

kvindex := ci.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))

if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
Expand Down
11 changes: 8 additions & 3 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ const (
)

var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000
var defaultCompactionBatchLimit = 1000
var defaultCompactionSleepInterval = 10 * time.Millisecond

type StoreConfig struct {
CompactionBatchLimit int
CompactionBatchLimit int
CompactionSleepInterval time.Duration
}

type store struct {
Expand Down Expand Up @@ -94,7 +96,10 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
lg = zap.NewNop()
}
if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit
cfg.CompactionBatchLimit = defaultCompactionBatchLimit
}
if cfg.CompactionSleepInterval == 0 {
cfg.CompactionSleepInterval = defaultCompactionSleepInterval
}
s := &store{
cfg: cfg,
Expand Down
7 changes: 5 additions & 2 deletions server/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

batchNum := s.cfg.CompactionBatchLimit
batchTicker := time.NewTicker(s.cfg.CompactionSleepInterval)
defer batchTicker.Stop()
h := newKVHasher(prevCompactRev, compactMainRev, keep)
last := make([]byte, 8+1+8)

for {
var rev revision

Expand All @@ -58,7 +61,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
h.WriteKeyValue(keys[i], values[i])
}

if len(keys) < s.cfg.CompactionBatchLimit {
if len(keys) < batchNum {
// gofail: var compactBeforeSetFinishedCompact struct{}
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
Expand Down Expand Up @@ -87,7 +90,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))

select {
case <-time.After(10 * time.Millisecond):
case <-batchTicker.C:
case <-s.stopc:
return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
}
Expand Down
5 changes: 4 additions & 1 deletion server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,10 @@ func newFakeStore() *store {
Recorder: &testutil.RecorderBuffered{},
rangeRespc: make(chan rangeResp, 5)}}
s := &store{
cfg: StoreConfig{CompactionBatchLimit: 10000},
cfg: StoreConfig{
CompactionBatchLimit: 10000,
CompactionSleepInterval: defaultCompactionSleepInterval,
},
b: b,
le: &lease.FakeLessor{},
kvindex: newFakeIndex(),
Expand Down

0 comments on commit 1d4372a

Please sign in to comment.